amplify-android
amplify-android copied to clipboard
DataStore Recoverability - MutationProcessor does not retry on failed mutations and should transition to LOCAL_ONLY when errored
Before opening, please confirm:
- [X] I have searched for duplicate or closed issues and discussions.
Language and Async Model
Kotlin
Amplify Categories
DataStore
Gradle script dependencies
Environment information
# Put output below this line
Picked up JAVA_TOOL_OPTIONS: -Dlog4j2.formatMsgNoLookups=true
------------------------------------------------------------
Gradle 7.3.3
------------------------------------------------------------
Build time: 2021-12-22 12:37:54 UTC
Revision: 6f556c80f945dc54b50e0be633da6c62dbe8dc71
Kotlin: 1.5.31
Groovy: 3.0.9
Ant: Apache Ant(TM) version 1.10.11 compiled on July 10 2021
JVM: 1.8.0_292 (AdoptOpenJDK 25.292-b10)
OS: Mac OS X 10.16 x86_64
Please include any relevant guides or documentation you're referencing
DataStore guides
Describe the bug
While DataStore is running in an app, the network is turned off. Saving a model to DataStore persists the model to the local store, and fail to sync to cloud.
W/amplify:aws-datastore: Error ended observation of mutation outbox:
DataStoreException{message=Failed to process PendingMutation{mutatedItem=SerializedModel{id='34b2e6bb-58d7-4dde-bf08-554f7f5a9817', serializedData={name=name, createdAt=null, id=34b2e6bb-58d7-4dde-bf08-554f7f5a9817, updatedAt=null}, modelName=Author}, mutationType=CREATE, mutationId=4b803dcb-f30e-11ec-bbe9-b9b8b01e27b2, predicate=MatchAllQueryPredicate}, cause=null, recoverySuggestion=Check your internet connection.}
at com.amplifyframework.datastore.syncengine.MutationProcessor.drainMutationOutbox(MutationProcessor.java:119)
at com.amplifyframework.datastore.syncengine.MutationProcessor.lambda$startDrainingMutationOutbox$1$com-amplifyframework-datastore-syncengine-MutationProcessor(MutationProcessor.java:101)
at com.amplifyframework.datastore.syncengine.MutationProcessor$$ExternalSyntheticLambda3.apply(Unknown Source:4)
at io.reactivex.rxjava3.internal.operators.observable.ObservableFlatMapCompletableCompletable$FlatMapCompletableMainObserver.onNext(ObservableFlatMapCompletableCompletable.java:97)
at io.reactivex.rxjava3.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainNormal(ObservableObserveOn.java:201)
at io.reactivex.rxjava3.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:255)
at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:65)
at io.reactivex.rxjava3.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:56)
at java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:307)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1137)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:637)
at java.lang.Thread.run(Thread.java:1012)
Turning the network back on, the previous failed mutation and subsequent DataStore.save's do not get processed by the MutationProcesor. This leaves me with having to restart the app to get the data to sync. Sometimes if I keep the network off long enough, I can see the websocket connections throw an error, then turning the network back on, will actually start the sync process again, and drain the mutation events.
// keep network off long enough, then websocket error, and DataStore transitions to "local_only" mode.
W/amplify:aws-datastore: API sync failed - transitioning to LOCAL_ONLY.
DataStoreException{message=Error during subscription., cause=ApiException{message=Subscription failed., cause=java.net.SocketException: Socket closed, recoverySuggestion=Check your Internet connection. Is your device online?}, recoverySuggestion=Evaluate details.}
at com.amplifyframework.datastore.appsync.AppSyncClient.lambda$subscription$3(AppSyncClient.java:331)
at com.amplifyframework.datastore.appsync.AppSyncClient$$ExternalSyntheticLambda1.accept(Unknown Source:4)
at com.amplifyframework.api.aws.SubscriptionOperation.lambda$null$1$com-amplifyframework-api-aws-SubscriptionOperation(SubscriptionOperation.java:87)
at com.amplifyframework.api.aws.SubscriptionOperation$$ExternalSyntheticLambda0.accept(Unknown Source:4)
at com.amplifyframework.api.aws.SubscriptionEndpoint$Subscription.dispatchError(SubscriptionEndpoint.java:432)
at com.amplifyframework.api.aws.SubscriptionEndpoint.notifyError(SubscriptionEndpoint.java:233)
at com.amplifyframework.api.aws.SubscriptionEndpoint.access$100(SubscriptionEndpoint.java:61)
at com.amplifyframework.api.aws.SubscriptionEndpoint$AmplifyWebSocketListener.onFailure(SubscriptionEndpoint.java:528)
at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.kt:592)
at okhttp3.internal.ws.RealWebSocket$connect$1.onResponse(RealWebSocket.kt:197)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1137)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:637)
at java.lang.Thread.run(Thread.java:1012)
Caused by: ApiException{message=Subscription failed., cause=java.net.SocketException: Socket closed, recoverySuggestion=Check your Internet connection. Is your device online?}
at com.amplifyframework.api.aws.SubscriptionEndpoint.notifyError(SubscriptionEndpoint.java:233)
at com.amplifyframework.api.aws.SubscriptionEndpoint.access$100(SubscriptionEndpoint.java:61)
at com.amplifyframework.api.aws.SubscriptionEndpoint$AmplifyWebSocketListener.onFailure(SubscriptionEndpoint.java:528)
at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.kt:592)
at okhttp3.internal.ws.RealWebSocket$connect$1.onResponse(RealWebSocket.kt:197)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1137)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:637)
at java.lang.Thread.run(Thread.java:1012)
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:118)
at java.net.SocketInputStream.read(SocketInputStream.java:173)
at java.net.SocketInputStream.read(SocketInputStream.java:143)
at com.android.org.conscrypt.ConscryptEngineSocket$SSLInputStream.readFromSocket(ConscryptEngineSocket.java:945)
at com.android.org.conscrypt.ConscryptEngineSocket$SSLInputStream.processDataFromSocket(ConscryptEngineSocket.java:909)
at com.android.org.conscrypt.ConscryptEngineSocket$SSLInputStream.readUntilDataAvailable(ConscryptEngineSocket.java:824)
at com.android.org.conscrypt.ConscryptEngineSocket$SSLInputStream.read(ConscryptEngineSocket.java:797)
at okio.InputStreamSource.read(JvmOkio.kt:90)
at okio.AsyncTimeout$source$1.read(AsyncTimeout.kt:129)
at okio.RealBufferedSource.request(RealBufferedSource.kt:206)
at okio.RealBufferedSource.require(RealBufferedSource.kt:199)
at okio.RealBufferedSource.readByte(RealBufferedSource.kt:209)
at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.kt:119)
at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.kt:102)
at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.kt:293)
at okhttp3.internal.ws.RealWebSocket$connect$1.onResponse(RealWebSocket.kt:195)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1137)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:637)
at java.lang.Thread.run(Thread.java:1012)
I/amplify:aws-datastore: Orchestrator transitioning from SYNC_VIA_API to LOCAL_ONLY
I/amplify:aws-datastore: Setting currentState to LOCAL_ONLY
I/amplify:aws-datastore: Stopping subscription processor.
I/amplify:aws-datastore: Stopped subscription processor.
I/amplify:aws-api: No more active subscriptions. Closing web socket.
// turn network back on
// DataStore.save
I/amplify:aws-datastore: Orchestrator transitioning from LOCAL_ONLY to SYNC_VIA_API
I/amplify:aws-datastore: Setting currentState to SYNC_VIA_API
I/amplify:aws-datastore: Orchestrator lock released.
I/amplify:aws-datastore: Starting API synchronization mode.
I/amplify:aws-datastore: Starting processing subscription events.
// outgoing mutation events get processed.
Reproduction steps (if applicable)
Scenario 1 (mutations not sent to cloud while network is back up after it was turned off)
- Provision an API for DataStore
- In the app, configure APIPlugin and DataStore
- DataStore.save() and confirm data is persisted to the local store and to the cloud
- Turn network off, DataStore.save(), will log an error from the MutationProcessor
- Turn network back on, DataStore.save() persists to local store, no mutations are being sent to the cloud.
Scenario 2 (success - any changes should not cause regression to this scenario)
- DataStore.save() and confirm data is persisted to the local store and to the cloud
- Turn network off, DataStore.save(), will log an error from the MutationProcessor
- Wait until the websocket disconnects.
- Turn network back on, DataStore.save() persists to local store, mutations are being sent to the cloud.
Code Snippet
// Put your code below this line.
Log output
// Put your logs below this line
amplifyconfiguration.json
No response
GraphQL Schema
// Put your schema below this line
Additional information and screenshots
MutationProcessor will dequeue a mutation event and send it to AppSync. An error is thrown from that call when the network is turned off (tested on simulator, using wifi toggle from the mac). The mutation event is not removed, which is correct. Subsequent saves persist to the local store and pending outbox. Turning the wifi back on does not drain the outgoing mutation events, and the sync engine remains in a stopped state.
Expected behavior: The MutationProcessor process should perform retryability logic with exponential back-off on transient network responses.
Retryability logic: When the network is back up, the next scheduled retry will send the mutation successfully. This way, the sync process remains active, and spins on the syncing the mutation to AppSync. To accurately perform retryability, the API used needs to propagate the network error back in its interface for MutationProcessor to use. MutationProcessor uses its internal AppSyncClient, which uses the Amplify.API. Once the API returns network errors in a way that can be handled, then MutationProcessor can check for it and schedule the retry.
Expected behavior: The sync process should transition to LOCAL_ONLY mode for recoverability when it encounters the error.
The sync process should transition to the LOCAL_ONLY mode, similar to what happens in the websocket disconnect scenario. That way, subsequent saves will restart the sync process. The sync process will handle its logic based on the network status, for example, if there's a reachability check then it may not perform the initial sync. Lack of, would perform the initial sync and fail gracefully if network is down.
The steps I would tackle this is
- Handle unclassified errors from the MutationProcessor by moving the sync process to LOCAL_ONLY
- Reproduce the scenario, followed by DataStore.save() while network remains off should attempt to transition to SYNC_VIA_API and attempt the sync process- outcome should be identical to starting DataStore while network is down.
- Reproduce the scenario, followed by DataStore.save() with network back on, should transition from LOCAL_ONLY to SYNC_VIA_API successfully and drain the outgoing mutation events.
Adding retryability within the MutationProcessor
- Classify errors as transient network responses from the API used in MutationProcessor. This is checking the error returned from the API, and we may need to update how API propagates transient network failures, then handling that in MutationProcessor.
- Once classified, the retryablity logic can be implemented to schedule retries. Retries should be scheduled as long as the error is a transient network response, otherwise it should move the sync process to LOCAL_ONLY.
- Reproduce the scenario, and turn network back on, the mutation event is sent successfully, and remaining outbox is drained.
Notes from discussion:
There are two issues here, first, we're not correctly classifying retryable vs. non-retryable errors for the customers, and second when we encounter a non-retryable error we should be entering offline. (p2) System not coming out of offline-state should be treated as a p1.
This one is biting us pretty bad right now, as we have a large percentage of our end users running on weak LTE signal. Here is what I have found out about this bug:
- The most reliable way I have found to reproduce the issue is to throttle the network's connection speed to induce a timeout in the MutationProcessor that is handled via the aforementioned log message. I applied the throttle to the vanilla emulator using the emulator console's network speed commands. After inducing the issue, one can speed the network back up and prove connectivity through other means, but still observe the accumulation of pending mutations.
- The recovery appears to be induced by
netdcleaning up the socket connections, which saves us by nuking the AppSync client's connection and forcing the orchestrator to recycle (likely in the SubscriptionProcessor). The way I have been triggering the cleanup is by cycling the WiFi antenna (on or off, it does not matter). I think this is the reason this bug was opaque to our team: our "offline" testing consisted of disabling wifi and/or enabling airplane mode. - I have not tested if other passive network changes induce the
netdcleanup (yet). I intend to perform that test to provide our users with a sure-fire workaround. - FWIW, I have begun working on our own fix for this issue and may have something to send upstream soon (if your team does not have a fix before then).
This sounds very similar to what was fixed in https://github.com/aws-amplify/amplify-android/pull/1755
Have y'all tried to replicate on 1.36.1+?
Yes, @joekiller, we tested on 1.36.4 (specifically 7380a2c). In our testing for #1755, we were moving in and out of WiFi networks, which causes the orchestrator to recycle via the aforementioned socket cleanup; catching the exception allows the recycle to occur. But this issue describes the case of not having an explicit network event to trigger the recycle (e.g. if a device remains connected to the same LTE network for an extended period of time). Having a handler that is more robust than a logger is what I am suggesting as a fix in #1845.
Thank you @sbaxter for the context. I now understand the situation you are resolving with #1845.
#1845 would fix the problem encountered with the blockingAwait timeout encountered in #1794.
Hey @joekiller we recently made some changes and are still doing internal verification, but can you let us know if you are still experiencing this issue on your end?
@chrisbonifacio We are running a fork at this point and perhaps we could run a few of our error provoking tests but I need some clarification to understand what you are asking so I'm going to blab a bit, ask a question of clarification, and then specify a test.
The primary problem of "this issue" is the mutation processor will halt processing when an exception rises to the subscribe error handler.
In our fork we added additional handling during the ending of the subscription due to error to ensure that on any given error the mutationProcessor would restart without external Orchestrator provocation. We chose this means because if we bubble the exception to the Orchestrator, as originally proposed in #1845, it would effectively tear down the world, ie halt any datastore operations and only allow them back once the sync/subscription handshakes completed. It was a bit too heavy so we though the delayed restarting of startDrainingMutationOutbox to be more elegant and effective.
Reading https://github.com/aws-amplify/amplify-android/commit/0cc167bd58ebb55d03050daef578a8837f44619d, it appears the code attempts two fixes:
a. One is to remove the ITEM_PROCESSING_TIMEOUT_MS and b. the second is introducing the retry processor.
Per point a. it would appear that this would resolve instances of transmissions longer than ITEM_PROCESSING_TIMEOUT_MS causing the mutationProcessor from getting an exception and halting the observation. (so that's good).
Per point b. The retry processor could still emit exceptions in the case of retrying beyond maxAttempts or any exception not in the exception exclusion list which in both cases would still halt the mutationProcessor observation. (not good)
With the previous stated, are you asking me, "Does the addition of the retryProcessor and removing the ITEM_PROCESSING_TIMEOUT_MS resolve the MutationProcessor not retrying on failed mutations?"
If that is the question. I can check however I believe my default test case for this situation may still cause an exception that would halt the engine. However I think the points I made regarding b. would still stand.
The experiment I typically run to test this situation would be similar to the following:
Create an application that has 18 tables and write to the tables new data every ~3 seconds. Start the application online and then remove the network to allow the pending mutations table to build to around 100-200 records. Next give network back to the application and allow the Orchestrator to switch to online mode and begin draining the application. While it is draining disable the network again and it is likely that the transmission of one of those records will cause a NetworkSocket disconnected exception which would bypass the retry handler and likely bubble the exception to the mutationProcessor observation causing the outbound mutations to halt until an external event causes the Orchestrator to cycle the world again.
⚠️COMMENT VISIBILITY WARNING⚠️
Comments on closed issues are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.