apollo-kotlin icon indicating copy to clipboard operation
apollo-kotlin copied to clipboard

Retrying a subscription does not renew the id and may cause an error on the server because the id is already used

Open damianpetla opened this issue 1 year ago • 23 comments




I have noticed few crashes of my app when using subscription. It looks like SDK have problem with parsing errors. I am not sure exactly why. I have contacted my backend team but maybe letting you know can resolve that issue faster.

My setup of ApolloClient looks like this

                .reopenWhen { throwable, attempt ->
                    if (attempt < 13) {
                    } else {

and subscription like this

            .mapNotNull { response ->
                val exception = response.exception
                if (exception is ApolloGraphQLException) {
                    Timber.e(BoostCompetitionException(roomId, exception.errors, exception))
                try {
                } catch (ex: Exception) {
                    Timber.e(BoostCompetitionParsingException(roomId, ex.message, ex))

Just before the crash reopenWhen set on WebScoketNetworkTransport was fired once with attempt 0

Is there something I could do to prevent app from crashing?

Steps to reproduce the behavior

No response


com.apollographql.apollo3.exception.JsonDataException: Expected BEGIN_ARRAY but was BEGIN_OBJECT at path errors
    at com.apollographql.apollo3.api.json.MapJsonReader.peek
    at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
    at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
    at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
    at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
    at com.apollographql.apollo3.api.internal.ResponseParser.readErrors
    at com.apollographql.apollo3.api.internal.ResponseParser.readErrors
    at com.apollographql.apollo3.api.internal.ResponseParser.parse
    at com.apollographql.apollo3.api.Operations.parseJsonResponse
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$map$1$2.emit
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$map$1$2.emit$bridge
    at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke
    at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke
    at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke
    at kotlinx.coroutines.flow.internal.SafeCollector.emit
    at kotlinx.coroutines.flow.internal.SafeCollector.emit
    at kotlinx.coroutines.flow.internal.SafeCollector.emit
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invokeSuspend
    at androidx.compose.material.AnchoredDraggableKt$snapTo$2.invokeSuspend$bridge
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke
    at androidx.compose.material.AnchoredDraggableKt$snapTo$2.invoke$bridge(SourceFile:0)
    at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$1.emit
    at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$2.emit
    at androidx.compose.ui.platform.WindowRecomposer_androidKt$createLifecycleAwareWindowRecomposer$2$onStateChanged$1$1$1$1.emit$bridge
    at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$filter$1$2.emit
    at androidx.compose.foundation.text.selection.SelectionMagnifierKt$rememberAnimatedMagnifierPosition$1$2.emit$bridge
    at kotlinx.coroutines.flow.SubscribedFlowCollector.emit
    at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl
    at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith
    at kotlinx.coroutines.DispatchedTask.run
    at kotlinx.coroutines.DispatchedTask.run
    at kotlinx.coroutines.internal.LimitedDispatcher.run
    at kotlinx.coroutines.scheduling.TaskImpl.run
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run

damianpetla avatar Sep 01 '23 11:09 damianpetla

Looks like your backend is returning errors as an object instead of the expected list (spec). The response should look like

  "data": null,
  "errors": [
      "message": "Name for character with ID 1002 could not be fetched.",
      "locations": [{ "line": 6, "column": 7 }],
      "path": ["hero", "heroFriends", 1, "name"]

This should be fixed in your backend.

I also agree this should be recoverable from the client side of things. Did you try catching errors in your Flow? Something like

     .catch { 
       // recover here

martinbonnin avatar Sep 01 '23 11:09 martinbonnin

Hey @martinbonnin thanks for quick reply. I haven't tried catch clause just checking exceptions in mapNotNull and on the client itself. I will add catch and see how it goes. I saw you added some PR, do I get it right that this will prevent crash? If so, when can I expect next version? Thanks!

damianpetla avatar Sep 04 '23 13:09 damianpetla

do I get it right that this will prevent crash?

Yes, instead of a crash, you will have an ApolloResponse with null data and non-null exception instead

when can I expect next version?

We typically release ~1 a month these days but there is no fixed release schedule. You can already try in the SNAPSHOTs though

martinbonnin avatar Sep 04 '23 13:09 martinbonnin

hey @martinbonnin My team was looking into the crash from the backend side and apparently it was same issue as this one https://github.com/hasura/graphql-engine/issues/3564 We are using Hasura.

If we understand it correctly, it was the client trying to connect with the same operationId.

This 👇 what I got from the backend team. Replaced some data with <removed>

  "detail": {
    "connection_info": {
      "msg": null,
      "token_expiry": "2023-08-31T20:09:54Z",
      "websocket_id": "<removed>"
    "event": {
      "detail": {
        "operation_id": "cbeb464f-f79<removed>",
        "operation_name": "SomeOperation",
        "operation_type": {
          "detail": "an operation already exists with this id: <removed>",
          "type": "proto_err"
        "parameterized_query_hash": null,
        "query": {
          "operationName": "SomeOperation",
          "query": "subscription <removed>"
        "request_id": null
      "type": "operation"
    "user_vars": {
      "x-hasura-role": "user",
      "x-hasura-user-id": "<removed>"
  "level": "error",
  "timestamp": "2023-08-31T20:00:30.083+0000",
  "type": "websocket-log"

I am letting you know because maybe there is another issue with Apollo SDK that you would like to investigate.

damianpetla avatar Sep 06 '23 09:09 damianpetla

Thanks for the follow up!

  "detail": { ... },
  "level": "error",
  "timestamp": "2023-08-31T20:00:30.083+0000",
  "type": "websocket-log"

Is that the content of a WebSocket message? Or the contents of the "errors" field? The graphql-ws protocol defines messages here. An error message should be like:

  id: '<unique-operation-id>';
  type: 'error';
  payload: GraphQLError[];

And a GraphQLError is defined here

All in all, we can make it so that Apollo Kotlin catches these errors and exposes them in ApolloResponse.exception alongside other possible errors but from my current understanding this looks like the Hasura server returns non-compliant data in the first place.

martinbonnin avatar Sep 06 '23 09:09 martinbonnin

@martinbonnin hey, thanks for your info. I just managed to reproduce this error. I was using single collector in my view model that use flow from subscription. Now, I tried to setup second collector on the same flow and that error appeared. Is it not allowed to have multiple collectors?

damianpetla avatar Sep 19 '23 11:09 damianpetla

Excellent question. From a quick look at the code, collecting the same Flow twice will send two start messages with the same requestUuid (code). I can see how that can confuse the server.

The quick fix is to re-use your ApolloCall, not Flow:

val call = apolloClient.subscription(subscription)

launch {
  // first collector
  call.toFlow().collect { ... }
launch {
  // second collector
  call.toFlow().collect { ... }

This will make sure a new requestUuid is allocated every time. Longer term we might want to allocate a new uuid in toFlow() but I'm not 100% sure the API implications of this yet.

martinbonnin avatar Sep 19 '23 12:09 martinbonnin

thanks @martinbonnin can't do that unfortunately because we use KKM and I have Apollo in shared module exposing Flow and two collectors in android module. I will use single flow, not a big deal. Just wanted to raise this as it might be a potential improvement to Apollo allowing multiple collectors.

damianpetla avatar Sep 19 '23 13:09 damianpetla

hey @martinbonnin it might be SDK issue, not sure but I am playing a bit with subscriptions and turning off and on network to see how reconnection behave. I noticed this error

WebSocket Closed
 code='1000' reason='{"server_error_msg":"4400: an operation already exists with this
id: c20d5533-40f8-4b2b-8487-024c5dba79a8"}

This is thrown after I turned on the internet back. I have a single subscription running and that error is from retryWhen on ApolloClient directly.

EDIT: Realised that to reproduce it I had to disable internet first and then try subscribe. Lets say you have some screen with subscription. You turn of the internet before going to that screen, then you go there. You turn on the internet and this error is presented. I also realised it's kinda the same issue I was reporting for trying multiple collectors but this time I am not. It's is single collect. So maybe something for SDK to handle?

damianpetla avatar Sep 22 '23 13:09 damianpetla

Hey @martinbonnin how are you doing? I was wondering how are you doing with the fix for this issue. I see there is a draft PR already but it's pretty old. Can we expect some progress any time soon on this?

damianpetla avatar Feb 13 '24 09:02 damianpetla

@damianpetla apologies for the super late response here. This ended up being quite the rabbit hole...

The tlrd; of that rabbit hole is that retrying at the WebSocket like was previously done has major limitations as you found out:

  1. The WebSocket doesn't know about operation ids and cannot update them when retrying.
  2. In general, subscriptions can happen over other transports than websockets. Multipart HTTP being another example. So retrying at the WebSocket layer does not help there.

All in all, 4.0.0-beta.5 adds a retry option on the ApolloClient itself:

        .retryOnError { it.operation is Subscription }

This code also uses an incubating WebSocketNetworkTransport that should make it a lot easier to debug things moving forward:

dependencies {

For more details, you can also check the integration test here

The current plan is to replace the default implementation of WebSocketNetworkTransport in the next version if everything looks good with the incubating one. If you're still around and have the time to try it, I'd love to hear what you think. Again, deep apologies for the delay on this issue and thanks for raising it 🙏

martinbonnin avatar Mar 12 '24 21:03 martinbonnin

Hey @martinbonnin Thanks for update, I was looking for those changes. No worries about delay. We get this SDK for free so appreciate any feedback and support 😄

I have updated to beta5 now and added retryOnError. Will see how that works. Regarding incubating, I wouldn't mind to try that as well but would appreciate some hints on it. We use it on prod so don't want to take too much risk with experimentation🙈

Here is how we setup apollo client currently:

        .retryOnError { it.operation is Subscription }
                 * ApolloClient on logout stop subscriptions and wait by default 60 sec
                 * until connection is closed. Below function change that to 5 sec
                 * so another user has no chance to login and re-use previous connection.
                .reopenWhen { throwable, attempt ->
                        throwable = throwable,
                        messageString = "Reopen when attempt $attempt"
                    if (throwable is ApolloWebSocketClosedException) {
                            throwable = throwable,
                            messageString = "Web socket closed"
                     * Power of 13 attempts give ~5 sec delay.
                     * If it's higher attempt we keep 5 sec delay max.

Things that I would need to address:

  1. webSocketEngine cannot take KtorWebSocketEngine. Looks like there is just one implementation of WebSocketEngine inside incubating which is AppleWebSocketEngine
  2. protocol replaced with wsProtocol and there is no Factory for GraphQLWsProtocol but I guess it suppose to be created now with constructor?
  3. There is no reopenWhen which was called on network disconnections. Is it not needed anymore? Is every disconnection now passed to each subscription or is it handled silently by SDK?

Additionally, you can see that I am reducing idle timeout. I discovered that re-login user kept using previous connection. Do you know if there is a better way to invalidate connection?


damianpetla avatar Mar 13 '24 11:03 damianpetla

Hey thanks for the follow up!

  1. webSocketEngine cannot take KtorWebSocketEngine. Looks like there is just one implementation of WebSocketEngine inside incubating which is AppleWebSocketEngine

Indeed the WebSocketEngine has changed too. I'll work on ktor-engine-incubating. Follow up issue here

  1. protocol replaced with wsProtocol and there is no Factory for GraphQLWsProtocol but I guess it suppose to be created now with constructor?

TBH I'm not sure why this needed to be a factory. WsProtocol is mostly stateless and the constructors do not do any IO so it seemed better to bypass the Factory altogether.

  1. There is no reopenWhen which was called on network disconnections. Is it not needed anymore? Is every disconnection now passed to each subscription or is it handled silently by SDK?

Exactly. I realized recently that managing the reopen at the WebSocket layer was wrong because it doesn't work for multipart subscriptions.

In general, I like to see the WebSocket more and more as a raw socket. Just like HTTP 2 allows multiplexing several streams on a single raw socket, the WebSocket allows multiplexing several subscription streams. In terms of GraphQL APIs, how connections are pooled and reopened should not matter.

Using retryOnError {} gives more control to individually retry some subscriptions. For an example "cold" subscriptions (which depend on a client state, think countTo(10) as opposed to "warm" subscriptions like currentTime) might not want to be retried.

As a nice bonus, 4.0.0-beta.5 uses Android/iOS APIs to retry when network connectivity comes back instead of exponential backoff for reduced latency.

Additionally, you can see that I am reducing idle timeout. I discovered that re-login user kept using previous connection. Do you know if there is a better way to invalidate connection?

Right. That's actually the (big) difference between a GraphQL WebSocket and a raw HTTP2 socket is that the GraphQL WebSocket is stateful. I'm not a huge fan of this. Can you share more details about how authentication is handled by your backend? Especially how do you know a subscription has expired? I'm hoping you can handle that with a custom ApolloInterceptor

martinbonnin avatar Mar 13 '24 14:03 martinbonnin

hey @martinbonnin I have played a bit with retryOnError and it's being called on every new subscription with EmptyExecutionContext. While writing this message I realised I might have misunderstood this operator. Correct me if I am wrong but it's setting up condition for later when actual error is encountered, right?

Btw, I also noticed that on individual operators I could call setRetryOnError and read it then inside retryOnError { it.operation.retryOnError } So I could use that if I want let individual calls to decide if it should be retried.

damianpetla avatar Mar 13 '24 15:03 damianpetla

Oooh my..., the KDoc for ApolloClient.Builder is severaly lacking. Follow up issue here.

  • ApolloCall.retryOnError(Boolean) uses a boolean that lets individual operation decide if they want to be retried
  • ApolloClient.Builder.retryOnError((ApolloRequest) -> Boolean) uses a function that determines the default value if no retryOnError has been set on the ApolloCall. Usually you'll do { it.operation is Subscription } to only retry subscriptions (but you might want to retry queries too)

The actual logic is done in RetryOnErrorInterceptor. It leverages coroutines to retry the Flow whenever an error happens. If no error happens then no retry is done.

Hope this helps, since this is all pretty new there's not a lot of docs but I'll get working on this (edit: KDoc PR here).

martinbonnin avatar Mar 13 '24 15:03 martinbonnin

@damianpetla 4.0.0-beta.6 has shipped with KDoc, documentation and migration guide: https://www.apollographql.com/docs/kotlin/v4/advanced/experimental-websockets. Let us know how that goes.

martinbonnin avatar Apr 29 '24 15:04 martinbonnin

hey @martinbonnin I was trying it out but don't know what to do with KtorWebSocketEngine that we were using so far on previous WebSocketNetworkTransport

damianpetla avatar May 06 '24 09:05 damianpetla

Right, this part is still needed... We're actually looking into breaking down the apollo-kotlin repo in smaller ones. KtorWebSocketEngine is a lot less stable than ApolloClient and it doesn't really make sense for them to share the versioning. KtorWebSocketEngine might be a good candidate for that... It might take a while to get it published and everything. In the meantime, I'll look into sending you a gist that you can copy/paste in your project.

martinbonnin avatar May 06 '24 09:05 martinbonnin

@damianpetla gist is here.

I have tested it passes all the WebSocketNetworkTransportTest tests except one little corner case where it throws ApolloNetworkException instead of ApolloWebSocketClosedException here. I'll work on getting that tested and published properly but in the meantime you should be able to just copy/pasta this.

martinbonnin avatar May 07 '24 14:05 martinbonnin

hey @martinbonnin I have tried your gist but for some reason subscriptions stopped emitting any data. I have logged the engine and I see frames being send and received.

damianpetla avatar May 08 '24 08:05 damianpetla

Can you upload a small reproducer somewhere?

martinbonnin avatar May 08 '24 23:05 martinbonnin

Not sure really how at the moment, building something on a side would probably take too much time. I might look into the code more later and try debugging more. Swamped with work currently 😓 Even trying it was a bit stretching but not giving up yet so don't worry :)

damianpetla avatar May 09 '24 13:05 damianpetla

I hear you, thanks for looking into this! WebSockets are hard because the transport isn't specified so there are a lot of variants out there. To make things worse, they are very often behind authentication, making it really hard to reproduce the issues.

I've uploaded a small playground here that uses MockServer. Hopefully it can help reproducing/understanding what's going wrong there. Let me know if you find out anything.

martinbonnin avatar May 09 '24 14:05 martinbonnin

Hey @martinbonnin I will be looking into this again in upcoming days. I will setup first some tests for existing configurations and functions that we have and then try switching to new transport. It's been a while so just checking if there are any interesting updates.

damianpetla avatar Jun 12 '24 12:06 damianpetla

Thanks for the follow up. No specific updates. Try with 4.0.0-beta.7. You might need 4.0.0-beta.8-SNAPSHOT because of https://github.com/apollographql/apollo-kotlin/pull/5948

martinbonnin avatar Jun 12 '24 12:06 martinbonnin

hey @martinbonnin why examining current setup I was trying passing bearer with Auth ktor plugin instead defaultRequest. However, when token expires WS throws this exception

com.apollographql.apollo3.exception.ApolloWebSocketClosedException: WebSocket Closed code='4403' reason='{"server_error_msg":"4403: Forbidden"}'

and this 👇 is never called. I know refreshTokens is used for HTTP 401 but would actually be great if something similar was working for WS where I could provide new token.

install(Auth) {
  bearer {
    refreshTokens {

This is still with beta.5. I have some testing setup already and I should be trying new Apollo SDK soon but with own ktor engine this would not be addressed I guess.

Anyway, I was curious how would you deal with it? I will probably setup this 👇 to make sure on WSS there is always fresh token used.

        defaultRequest {
            if (url.protocol == URLProtocol.WSS) {
                    "new token goes here"

EDIT: defaultRequest seem to be a bad idea. That if condition does not work because I get there HTTP

damianpetla avatar Jun 21 '24 10:06 damianpetla

hey @martinbonnin maybe you could help me out simulate closing web socket by server due to expired token. I know I could simulate it with responseBody.enqueueMessage(CloseFrame(4403, "{\"server_error_msg\":\"4403: Forbidden\"}")) and it works to some degree throwing exception that I am expecting. However, test fails with

java.lang.IllegalStateException: No more responses in queue
	at com.apollographql.apollo3.mockserver.QueueMockServerHandler.handle(QueueMockServerHandler.kt:20)
	at com.apollographql.apollo3.mockserver.MockServerImpl.handleRequests(MockServer.kt:187)
	at com.apollographql.apollo3.mockserver.MockServerImpl.access$handleRequests(MockServer.kt:121)
	at com.apollographql.apollo3.mockserver.MockServerImpl$handleRequests$1.invokeSuspend(MockServer.kt)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:585)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:802)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:706)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:693)

I have tried enqueuing next message responseBody.enqueueMessage(TextMessage("{\"type\": \"next\", \"id\": \"$operationId\", \"payload\": $lives}")) that I called prior to closing WS. I guess it's not the right way to do it and I cannot figure this out. Once I have this I could be testing my setups more and provide more feedback on newer Apollo versions.

damianpetla avatar Jun 21 '24 15:06 damianpetla

Hi @damianpetla !

I guess the thing we need to understand is what your server is doing when a token expired. Looks like it's sending a 4403 CloseFrame and requiring your clients to reconnect? If that's the case, you can use MockServer.enqueueWebSocket to enqueue a 2nd session. Not sure that help?

Or if you have a Ktor server around, you could as well use this one for tests instead of MockServer? Could be another option?

martinbonnin avatar Jun 21 '24 15:06 martinbonnin

hey @martinbonnin 4403 is what our server return when token expire and it is causing reconnection, reopenWhen on subscription transport is fired. I have resolved my issue by separating ktor HttpClient for regular transport and subscription transport. Both use base HttpClient where logging is and then:

  • NetworkTransport uses Auth plugin with loadTokens and refreshTokens. Also added Retry for 403 but that's another story
  • SubscriptionTransport uses only defaultRequest plugin which set Authentication header on every call. That solution seems to be working well for me.

damianpetla avatar Jun 24 '24 08:06 damianpetla

Thanks for the update! Note that I have had issues detecting the WebSocket close frame code reliably on Apple platforms. See https://youtrack.jetbrains.com/issue/KTOR-6198. You might want to use some else (maybe a dedicated GraphQL error or something like this).

But all in all, looks like this issue is resolved? If yes, mind if I close it? Or is there anything remaining?

martinbonnin avatar Jun 24 '24 08:06 martinbonnin