Commit a216e7be authored by Grisha Kruglov's avatar Grisha Kruglov Committed by Grisha Kruglov
Browse files

Part 6: WorkManager-based SyncManager implementation

parent 3e31742d
......@@ -188,10 +188,6 @@ projects:
path: components/service/sync-logins
description: 'A library for integrating with Firefox Sync - Logins.'
publish: true
service-sync:
path: components/service/sync
description: 'A library for managing data synchronization.'
publish: true
service-fretboard:
path: components/service/fretboard
description: 'An Android framework for segmenting users in order to run A/B tests and rollout features gradually.'
......
# [Android Components](../../../README.md) > Feature > Sync
A component which allows synchronizing groups of similar SyncableStores.
SyncableStores must share an AuthType generic type in order to be grouped.
A component which provides facilities for managing data synchronization.
Provided is an implementation of concept-sync's SyncManager, allowing
background execution of synchronization and easily observing synchronization
state.
Currently, synchronization runs periodically in the background.
## Usage
This feature is configured with a CoroutineContext and a set of SyncableStore instances.
First, set everything up:
```kotlin
// A SyncableStore instance that we want to be able to synchronize.
private val historyStorage = PlacesHistoryStorage(this)
// Add your SyncableStore instances to the global registry.
GlobalSyncableStoreProvider.configureStore("history" to historyStorage)
// Configure SyncManager with sync scope and stores to synchronize.
private val syncManager = BackgroundSyncManager("https://identity.mozilla.com/apps/oldsync").also {
// Enable synchronization of store called "history".
// Internally, it will be accessed via GlobalSyncableStoreProvider.
it.addStore("history")
}
// Configure the account manager with an instance of a SyncManager.
// This will ensure that sync manager is made aware of auth changes.
private val accountManager by lazy {
FxaAccountManager(
this,
Config.release(CLIENT_ID, REDIRECT_URL),
arrayOf("profile", "https://identity.mozilla.com/apps/oldsync"),
syncManager
)
}
```
Here is an example of how this might look for if we were using it with `services-sync-logins`.
Once account manager is in an authenticated state, sync manager will immediately
synchronize, as well as schedule periodic syncing.
First, set it up:
It's possible to re-configure which stores are to be synchronized via `addStore` and
`removeStore` calls.
It's also possible to observe sync states and request immediate sync of configured stores:
```kotlin
val loginsStoreName: String = "placesLogins"
val loginsStore by lazy {
SyncableLoginsStore(
AsyncLoginsStorageAdapter.forDatabase(File(applicationContext.filesDir, "logins.sqlite").canonicalPath)
) {
CompletableDeferred("my-not-so-secret-password")
// Define what we want to do in response to sync status changes.
private val syncObserver = object : SyncStatusObserver {
override fun onStarted() {
CoroutineScope(Dispatchers.Main).launch {
syncStatus?.text = getString(R.string.syncing)
}
}
val featureSync by lazy {
val context = Dispatchers.IO + job
FirefoxSyncFeature(context) { authInfo ->
SyncUnlockInfo(
fxaAccessToken = authInfo.fxaAccessToken,
kid = authInfo.kid,
syncKey = authInfo.syncKey,
tokenserverURL = authInfo.tokenServerUrl
)
}.also {
it.addSyncable(loginsStoreName, loginsStore)
override fun onIdle() {
CoroutineScope(Dispatchers.Main).launch {
syncStatus?.text = getString(R.string.sync_idle)
val resultTextView: TextView = findViewById(R.id.syncResult)
// Can access synchronized data now.
}
}
```
Then trigger a sync when appropriate:
```kotlin
val firefoxAccount = getAuthenticatedAccount()
val syncResult = try {
featureSync.sync(firefoxAccount).await()
} catch (e: AuthException) {
// handle auth exception...
return
override fun onError(error: Exception?) {
CoroutineScope(Dispatchers.Main).launch {
syncStatus?.text = getString(R.string.sync_error, error)
}
}
}
val loginsSyncStatus = syncResult[loginsStoreName]!!.status
if (loginsSyncStatus is SyncError) {
// handle a sync exception
} else {
// all good!
}
// Register a sync status observer with this sync manager.
syncManager.register(syncObserver, owner = this, autoPause = true)
// Kick off a sync in response to some user action. Sync will run in the
// background until it completes, surviving application shutdown if needed.
findViewById<View>(R.id.buttonSyncNow).setOnClickListener {
syncManager.syncNow()
}
```
### Setting up the dependency
......@@ -64,9 +87,6 @@ Use Gradle to download the library from [maven.mozilla.org](https://maven.mozill
implementation "org.mozilla.components:feature-sync:{latest-version}"
```
You will also likely need to pull in a `concept-storage` dependency, which
provides sync result type definitions.
## License
This Source Code Form is subject to the terms of the Mozilla Public
......
......@@ -29,6 +29,8 @@ dependencies {
// its consumers. Use 'api' here instead of 'implementation' to make that easier.
api project(':support-base')
implementation Dependencies.arch_workmanager
testImplementation Dependencies.testing_junit
testImplementation Dependencies.testing_robolectric
testImplementation Dependencies.testing_mockito
......
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package mozilla.components.feature.sync
import mozilla.components.concept.sync.OAuthAccount
import mozilla.components.concept.sync.SyncManager
import mozilla.components.concept.sync.SyncStatusObserver
import mozilla.components.concept.sync.SyncableStore
import mozilla.components.support.base.log.logger.Logger
import mozilla.components.support.base.observer.Observable
import mozilla.components.support.base.observer.ObserverRegistry
import java.io.Closeable
import java.lang.Exception
import java.util.concurrent.TimeUnit
/**
* A singleton registry of [SyncableStore] objects. [WorkManagerSyncDispatcher] will use this to
* access configured [SyncableStore] instances.
*
* This pattern provides a safe way for library-defined background workers to access globally
* available instances of stores within an application.
*/
object GlobalSyncableStoreProvider {
private val stores: MutableMap<String, SyncableStore> = mutableMapOf()
fun configureStore(storePair: Pair<String, SyncableStore>) {
stores[storePair.first] = storePair.second
}
fun getStore(name: String): SyncableStore? {
return stores[name]
}
}
/**
* Internal interface to enable testing SyncManager implementations independently from SyncDispatcher.
*/
interface SyncDispatcher : Closeable, Observable<SyncStatusObserver> {
fun isSyncActive(): Boolean
fun syncNow(startup: Boolean = false)
fun startPeriodicSync(unit: TimeUnit, period: Long)
fun stopPeriodicSync()
fun workersStateChanged(isRunning: Boolean)
}
/**
* A SyncManager implementation which uses WorkManager APIs to schedule sync tasks.
*/
class BackgroundSyncManager(
private val syncScope: String
) : GeneralSyncManager() {
override val logger = Logger("BgSyncManager")
init {
// Initialize the singleton observer. This must be called on the main thread.
WorkersLiveDataObserver.init()
}
override fun createDispatcher(stores: Set<String>, account: OAuthAccount): SyncDispatcher {
return WorkManagerSyncDispatcher(stores, syncScope, account)
}
override fun dispatcherUpdated(dispatcher: SyncDispatcher) {
WorkersLiveDataObserver.setDispatcher(dispatcher)
}
}
private val registry = ObserverRegistry<SyncStatusObserver>()
/**
* A base SyncManager implementation which manages a dispatcher, handles authentication and requests
* synchronization in the following manner:
* On authentication AND on store set changes (add or remove)...
* - immediate sync and periodic syncing are requested
* On logout...
* - periodic sync is requested to stop
*/
@SuppressWarnings("TooManyFunctions")
abstract class GeneralSyncManager
: SyncManager, Observable<SyncStatusObserver> by registry, SyncStatusObserver {
companion object {
// Periodically sync in the background, to make our syncs a little more incremental.
// This isn't strictly necessary, and could be considered an optimization.
//
// Assuming that we synchronize during app startup, our trade-offs are:
// - not syncing in the background at all might mean longer syncs, more arduous startup syncs
// - on a slow mobile network, these delays could be significant
// - a delay during startup sync may affect user experience, since our data will be stale
// for longer
// - however, background syncing eats up some of the device resources
// - ... so we only do so a few times per day
// - we also rely on the OS and the WorkManager APIs to minimize those costs. It promises to
// bundle together tasks from different applications that have similar resource-consumption
// profiles. Specifically, we need device radio to be ON to talk to our servers; OS will run
// our periodic syncs bundled with another tasks that also need radio to be ON, thus "spreading"
// the overall cost.
//
// If we wanted to be very fancy, this period could be driven by how much new activity an
// account is actually expected to generate. For now, it's just a hard-coded constant.
const val SYNC_PERIOD = 4L
val SYNC_PERIOD_UNIT = TimeUnit.HOURS
}
open val logger = Logger("GeneralSyncManager")
private var account: OAuthAccount? = null
private val stores = mutableSetOf<String>()
// A SyncDispatcher instance bound to an account and a set of syncable stores.
private var syncDispatcher: SyncDispatcher? = null
override fun addStore(name: String) = synchronized(this) {
logger.debug("Adding store: $name")
stores.add(name)
account?.let {
syncDispatcher = newDispatcher(syncDispatcher, stores, it)
initDispatcher(syncDispatcher!!)
}
Unit
}
override fun removeStore(name: String) = synchronized(this) {
logger.debug("Removing store: $name")
stores.remove(name)
account?.let {
syncDispatcher = newDispatcher(syncDispatcher, stores, it)
initDispatcher(syncDispatcher!!)
}
Unit
}
override fun authenticated(account: OAuthAccount) = synchronized(this) {
logger.debug("Authenticated")
this.account = account
syncDispatcher = newDispatcher(syncDispatcher, stores, account)
initDispatcher(syncDispatcher!!)
logger.debug("set and initialized new dispatcher: $syncDispatcher")
}
override fun loggedOut() = synchronized(this) {
logger.debug("Logging out")
account = null
syncDispatcher!!.stopPeriodicSync()
syncDispatcher!!.close()
syncDispatcher = null
}
override fun syncNow(startup: Boolean) = synchronized(this) {
logger.debug("Requesting immediate sync")
syncDispatcher!!.syncNow(startup)
}
override fun isSyncRunning(): Boolean {
syncDispatcher?.let { return it.isSyncActive() }
return false
}
abstract fun createDispatcher(stores: Set<String>, account: OAuthAccount): SyncDispatcher
abstract fun dispatcherUpdated(dispatcher: SyncDispatcher)
// Manager encapsulates events emitted by the wrapped observers, and passes them on to the outside world.
// This split allows us to define a nice public observer API (manager -> consumers), along with
// a more robust internal observer API (dispatcher -> manager). Currently the interfaces are the same.
override fun onStarted() {
notifyObservers { onStarted() }
}
override fun onIdle() {
notifyObservers { onIdle() }
}
override fun onError(error: Exception?) {
notifyObservers { onError(error) }
}
private fun newDispatcher(
currentDispatcher: SyncDispatcher?,
stores: Set<String>,
account: OAuthAccount
): SyncDispatcher {
// Let the existing dispatcher, if present, cleanup.
currentDispatcher?.close()
// TODO will events from old and new dispatchers overlap..? How do we ensure correct sequence
// for outside observers?
currentDispatcher?.unregister(this)
// Create a new dispatcher bound to current stores and account.
return createDispatcher(stores, account)
}
private fun initDispatcher(dispatcher: SyncDispatcher) {
dispatcher.register(this@GeneralSyncManager)
dispatcher.syncNow()
dispatcher.startPeriodicSync(SYNC_PERIOD_UNIT, SYNC_PERIOD)
dispatcherUpdated(dispatcher)
}
}
......@@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package mozilla.components.service.sync
package mozilla.components.feature.sync
import android.support.annotation.VisibleForTesting
import kotlinx.coroutines.sync.Mutex
......@@ -11,8 +11,8 @@ import mozilla.components.concept.sync.AuthException
import mozilla.components.concept.sync.AuthInfo
import mozilla.components.concept.sync.OAuthAccount
import mozilla.components.concept.sync.StoreSyncStatus
import mozilla.components.concept.sync.SyncError
import mozilla.components.concept.sync.SyncResult
import mozilla.components.concept.sync.SyncStatus
import mozilla.components.concept.sync.SyncStatusObserver
import mozilla.components.concept.sync.SyncableStore
import mozilla.components.support.base.log.logger.Logger
......@@ -20,8 +20,7 @@ import mozilla.components.support.base.observer.Observable
import mozilla.components.support.base.observer.ObserverRegistry
/**
* A feature implementation which orchestrates data synchronization of a set of [SyncableStore] which
* all share a common, generic [AuthInfo].
* A feature implementation which orchestrates data synchronization of a set of [SyncableStore]-s.
*/
class StorageSync(
private val syncableStores: Map<String, SyncableStore>,
......@@ -54,7 +53,7 @@ class StorageSync(
account.authInfo(syncScope)
} catch (e: AuthException) {
syncableStores.keys.forEach { storeName ->
results[storeName] = StoreSyncStatus(SyncError(e))
results[storeName] = StoreSyncStatus(SyncStatus.Error(e))
}
return@withListeners results
}
......@@ -72,7 +71,7 @@ class StorageSync(
auth: AuthInfo
): StoreSyncStatus {
return StoreSyncStatus(store.sync(auth).also {
if (it is SyncError) {
if (it is SyncStatus.Error) {
logger.error("Error synchronizing a $storeName store", it.exception)
} else {
logger.info("Synchronized $storeName store.")
......
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package mozilla.components.feature.sync
import android.content.Context
import android.support.annotation.UiThread
import androidx.work.Constraints
import androidx.work.CoroutineWorker
import androidx.work.Data
import androidx.work.ExistingPeriodicWorkPolicy
import androidx.work.ExistingWorkPolicy
import androidx.work.NetworkType
import androidx.work.OneTimeWorkRequest
import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.PeriodicWorkRequest
import androidx.work.PeriodicWorkRequestBuilder
import androidx.work.WorkInfo
import androidx.work.WorkManager
import androidx.work.WorkerParameters
import mozilla.components.concept.sync.OAuthAccount
import mozilla.components.concept.sync.SyncStatus
import mozilla.components.concept.sync.SyncStatusObserver
import mozilla.components.service.fxa.SharedPrefAccountStorage
import mozilla.components.support.base.log.logger.Logger
import mozilla.components.support.base.observer.Observable
import mozilla.components.support.base.observer.ObserverRegistry
import java.io.Closeable
import java.util.concurrent.TimeUnit
private enum class SyncWorkerTag {
Common,
Immediate,
Periodic
}
private enum class SyncWorkerName {
Periodic,
Immediate
}
/**
* A singleton wrapper around the the LiveData "forever" observer - i.e. an observer not bound
* to a lifecycle owner. This observer is always active.
* We will have different dispatcher instances throughout the lifetime of the app, but always a
* single LiveData instance.
*/
object WorkersLiveDataObserver {
private val workersLiveData = WorkManager.getInstance().getWorkInfosByTagLiveData(
SyncWorkerTag.Common.name
)
private var dispatcher: SyncDispatcher? = null
@UiThread
fun init() {
workersLiveData.observeForever {
val isRunningOrNothing = it?.any { worker -> worker.state == WorkInfo.State.RUNNING }
val isRunning = when (isRunningOrNothing) {
null -> false
false -> false
true -> true
}
dispatcher?.workersStateChanged(isRunning)
// TODO process errors coming out of worker.outputData
}
}
fun setDispatcher(dispatcher: SyncDispatcher) {
this.dispatcher = dispatcher
}
}
class WorkManagerSyncDispatcher(
private val stores: Set<String>,
private val syncScope: String,
private val account: OAuthAccount
) : SyncDispatcher, Observable<SyncStatusObserver> by ObserverRegistry(), Closeable {
private val logger = Logger("WMSyncDispatcher")
// TODO does this need to be volatile?
private var isSyncActive = false
override fun workersStateChanged(isRunning: Boolean) {
if (isSyncActive && !isRunning) {
notifyObservers { onIdle() }
isSyncActive = false
} else if (!isSyncActive && isRunning) {
notifyObservers { onStarted() }
isSyncActive = true
}
}
override fun isSyncActive(): Boolean {
return isSyncActive
}
override fun syncNow(startup: Boolean) {
logger.debug("Immediate sync requested, startup = $startup")
val delayMs = if (startup) {
// Startup delay is there to avoid SQLITE_BUSY crashes, since we currently do a poor job
// of managing database connections, and we expect there to be database writes at the start.
// FIXME https://github.com/mozilla-mobile/android-components/issues/1369
SYNC_STARTUP_DELAY_MS
} else {
0L
}
WorkManager.getInstance().beginUniqueWork(
SyncWorkerName.Immediate.name,
// Use the 'keep' policy to minimize overhead from multiple "sync now" operations coming in
// at the same time.
ExistingWorkPolicy.KEEP,
immediateSyncWorkRequest(delayMs)
).enqueue()
}
override fun close() {
stopPeriodicSync()
}
/**
* Periodic background syncing is mainly intended to reduce workload when we sync during
* application startup.
*/
override fun startPeriodicSync(unit: TimeUnit, period: Long) {
logger.debug("Starting periodic syncing, period = $period, time unit = $unit")
// Use the 'replace' policy as a simple way to upgrade periodic worker configurations across
// application versions. We do this instead of versioning workers.
WorkManager.getInstance().enqueueUniquePeriodicWork(
SyncWorkerName.Periodic.name,
ExistingPeriodicWorkPolicy.REPLACE,
periodicSyncWorkRequest(unit, period)
)
}
override fun stopPeriodicSync() {
logger.debug("Cancelling periodic syncing")
WorkManager.getInstance().cancelUniqueWork(SyncWorkerName.Periodic.name)
}
private fun periodicSyncWorkRequest(unit: TimeUnit, period: Long): PeriodicWorkRequest {
val data = getWorkerData()
// Periodic interval must be at least PeriodicWorkRequest.MIN_PERIODIC_INTERVAL_MILLIS,
// e.g. not more frequently than 15 minutes.
return PeriodicWorkRequestBuilder<WorkManagerSyncWorker>(period, unit)
.setInputData(data)
.addTag(SyncWorkerTag.Common.name)
.addTag(SyncWorkerTag.Periodic.name)
.build()
}
private fun immediateSyncWorkRequest(delayMs: Long = 0L): OneTimeWorkRequest {
val data = getWorkerData()
return OneTimeWorkRequestBuilder<WorkManagerSyncWorker>()
.setConstraints(
Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.build()
)
.setInputData(data)
.addTag(SyncWorkerTag.Common.name)
.addTag(SyncWorkerTag.Immediate.name)
.setInitialDelay(delayMs, TimeUnit.MILLISECONDS)
.build()
}
private fun getWorkerData(): Data {
val dataBuilder = Data.Builder()
.putString("account", account.toJSONString())
.putString("syncScope", syncScope)
.putStringArray("stores", stores.toTypedArray())