apollo-kotlin
apollo-kotlin copied to clipboard
Retrying a subscription does not renew the id and may cause an error on the server because the id is already used
Version
4.0.0-alpha.2
Summary
I have noticed few crashes of my app when using subscription. It looks like SDK have problem with parsing errors. I am not sure exactly why. I have contacted my backend team but maybe letting you know can resolve that issue faster.
My setup of ApolloClient looks like this
ApolloClient.Builder()
.networkTransport(
HttpNetworkTransport.Builder()
.serverUrl(BuildConfig.BASE_URL)
.okHttpClient(okHttpClient)
.build()
)
.subscriptionNetworkTransport(
WebSocketNetworkTransport.Builder()
.protocol(GraphQLWsProtocol.Factory())
.serverUrl(BuildConfig.BASE_URL)
.okHttpClient(okHttpClient)
.reopenWhen { throwable, attempt ->
if (attempt < 13) {
delay(2.0.pow(attempt.toDouble()).toLong())
} else {
delay(5000L)
}
true
}
.build()
)
.build()
and subscription like this
apolloClient.subscription(BoostCompetitionSubscription(roomId))
.toFlow()
.mapNotNull { response ->
val exception = response.exception
if (exception is ApolloGraphQLException) {
Timber.e(BoostCompetitionException(roomId, exception.errors, exception))
}
try {
response.data?.boostCompetitions?.firstOrNull()?.toDomain()
} catch (ex: Exception) {
Timber.e(BoostCompetitionParsingException(roomId, ex.message, ex))
null
}
}
Just before the crash reopenWhen
set on WebScoketNetworkTransport
was fired once with attempt 0
Is there something I could do to prevent app from crashing?
Steps to reproduce the behavior
No response
Logs
com.apollographql.apollo3.exception.JsonDataException: Expected BEGIN_ARRAY but was BEGIN_OBJECT at path errors
at com.apollographql.apollo3.api.json.MapJsonReader.peek
at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
at com.apollographql.apollo3.api.json.MapJsonReader.beginArray
at com.apollographql.apollo3.api.internal.ResponseParser.readErrors
at com.apollographql.apollo3.api.internal.ResponseParser.readErrors
at com.apollographql.apollo3.api.internal.ResponseParser.parse
at com.apollographql.apollo3.api.Operations.parseJsonResponse
at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$map$1$2.emit
at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$map$1$2.emit$bridge
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke
at kotlinx.coroutines.flow.internal.SafeCollector.emit
at kotlinx.coroutines.flow.internal.SafeCollector.emit
at kotlinx.coroutines.flow.internal.SafeCollector.emit
at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invokeSuspend
at androidx.compose.material.AnchoredDraggableKt$snapTo$2.invokeSuspend$bridge
at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke
at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$3.invoke
at androidx.compose.material.AnchoredDraggableKt$snapTo$2.invoke$bridge(SourceFile:0)
at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$1.emit
at com.apollographql.apollo3.internal.FlowsKt$transformWhile$1$invokeSuspend$$inlined$collectWhile$2.emit
at androidx.compose.ui.platform.WindowRecomposer_androidKt$createLifecycleAwareWindowRecomposer$2$onStateChanged$1$1$1$1.emit$bridge
at com.apollographql.apollo3.network.ws.WebSocketNetworkTransport$execute$$inlined$filter$1$2.emit
at androidx.compose.foundation.text.selection.SelectionMagnifierKt$rememberAnimatedMagnifierPosition$1$2.emit$bridge
at kotlinx.coroutines.flow.SubscribedFlowCollector.emit
at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl
at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith
at kotlinx.coroutines.DispatchedTask.run
at kotlinx.coroutines.DispatchedTask.run
at kotlinx.coroutines.internal.LimitedDispatcher.run
at kotlinx.coroutines.scheduling.TaskImpl.run
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run
Looks like your backend is returning errors
as an object instead of the expected list (spec). The response should look like
{
"data": null,
"errors": [
{
"message": "Name for character with ID 1002 could not be fetched.",
"locations": [{ "line": 6, "column": 7 }],
"path": ["hero", "heroFriends", 1, "name"]
}
]
}
This should be fixed in your backend.
I also agree this should be recoverable from the client side of things. Did you try catching errors in your Flow? Something like
apolloClient.subscription(subscription)
.toFlow()
.catch {
// recover here
}
Hey @martinbonnin thanks for quick reply. I haven't tried catch
clause just checking exceptions in mapNotNull
and on the client itself. I will add catch
and see how it goes. I saw you added some PR, do I get it right that this will prevent crash? If so, when can I expect next version? Thanks!
do I get it right that this will prevent crash?
Yes, instead of a crash, you will have an ApolloResponse
with null data
and non-null exception
instead
when can I expect next version?
We typically release ~1 a month these days but there is no fixed release schedule. You can already try in the SNAPSHOTs though
hey @martinbonnin My team was looking into the crash from the backend side and apparently it was same issue as this one https://github.com/hasura/graphql-engine/issues/3564 We are using Hasura.
If we understand it correctly, it was the client trying to connect with the same operationId.
This 👇 what I got from the backend team. Replaced some data with <removed>
{
"detail": {
"connection_info": {
"msg": null,
"token_expiry": "2023-08-31T20:09:54Z",
"websocket_id": "<removed>"
},
"event": {
"detail": {
"operation_id": "cbeb464f-f79<removed>",
"operation_name": "SomeOperation",
"operation_type": {
"detail": "an operation already exists with this id: <removed>",
"type": "proto_err"
},
"parameterized_query_hash": null,
"query": {
"operationName": "SomeOperation",
"query": "subscription <removed>"
},
"request_id": null
},
"type": "operation"
},
"user_vars": {
"x-hasura-role": "user",
"x-hasura-user-id": "<removed>"
}
},
"level": "error",
"timestamp": "2023-08-31T20:00:30.083+0000",
"type": "websocket-log"
}
I am letting you know because maybe there is another issue with Apollo SDK that you would like to investigate.
Thanks for the follow up!
{
"detail": { ... },
"level": "error",
"timestamp": "2023-08-31T20:00:30.083+0000",
"type": "websocket-log"
}
Is that the content of a WebSocket message? Or the contents of the "errors" field? The graphql-ws
protocol defines messages here. An error message should be like:
{
id: '<unique-operation-id>';
type: 'error';
payload: GraphQLError[];
}
And a GraphQLError is defined here
All in all, we can make it so that Apollo Kotlin catches these errors and exposes them in ApolloResponse.exception
alongside other possible errors but from my current understanding this looks like the Hasura server returns non-compliant data in the first place.
@martinbonnin hey, thanks for your info. I just managed to reproduce this error. I was using single collector in my view model that use flow from subscription. Now, I tried to setup second collector on the same flow and that error appeared. Is it not allowed to have multiple collectors?
Excellent question. From a quick look at the code, collecting the same Flow
twice will send two start
messages with the same requestUuid
(code). I can see how that can confuse the server.
The quick fix is to re-use your ApolloCall
, not Flow
:
val call = apolloClient.subscription(subscription)
launch {
// first collector
call.toFlow().collect { ... }
}
launch {
// second collector
call.toFlow().collect { ... }
}
This will make sure a new requestUuid
is allocated every time. Longer term we might want to allocate a new uuid in toFlow()
but I'm not 100% sure the API implications of this yet.
thanks @martinbonnin can't do that unfortunately because we use KKM and I have Apollo in shared module exposing Flow
and two collectors in android module. I will use single flow, not a big deal. Just wanted to raise this as it might be a potential improvement to Apollo allowing multiple collectors.
hey @martinbonnin it might be SDK issue, not sure but I am playing a bit with subscriptions and turning off and on network to see how reconnection behave. I noticed this error
WebSocket Closed
code='1000' reason='{"server_error_msg":"4400: an operation already exists with this
id: c20d5533-40f8-4b2b-8487-024c5dba79a8"}
This is thrown after I turned on the internet back. I have a single subscription running and that error is from retryWhen
on ApolloClient directly.
EDIT: Realised that to reproduce it I had to disable internet first and then try subscribe. Lets say you have some screen with subscription. You turn of the internet before going to that screen, then you go there. You turn on the internet and this error is presented. I also realised it's kinda the same issue I was reporting for trying multiple collectors but this time I am not. It's is single collect. So maybe something for SDK to handle?
Hey @martinbonnin how are you doing? I was wondering how are you doing with the fix for this issue. I see there is a draft PR already but it's pretty old. Can we expect some progress any time soon on this?
@damianpetla apologies for the super late response here. This ended up being quite the rabbit hole...
The tlrd; of that rabbit hole is that retrying at the WebSocket like was previously done has major limitations as you found out:
- The WebSocket doesn't know about operation ids and cannot update them when retrying.
- In general, subscriptions can happen over other transports than websockets. Multipart HTTP being another example. So retrying at the WebSocket layer does not help there.
All in all, 4.0.0-beta.5
adds a retry option on the ApolloClient
itself:
ApolloClient.Builder()
.serverUrl(sampleServer.graphqlUrl())
.retryOnError { it.operation is Subscription }
.subscriptionNetworkTransport(
WebSocketNetworkTransport.Builder()
.serverUrl(sampleServer.subscriptionsUrl())
.build()
)
.build()
This code also uses an incubating WebSocketNetworkTransport
that should make it a lot easier to debug things moving forward:
dependencies {
implementation("com.apollographql.apollo3:apollo-websocket-network-transport-incubating")
}
For more details, you can also check the integration test here
The current plan is to replace the default implementation of WebSocketNetworkTransport
in the next version if everything looks good with the incubating one. If you're still around and have the time to try it, I'd love to hear what you think. Again, deep apologies for the delay on this issue and thanks for raising it 🙏
Hey @martinbonnin Thanks for update, I was looking for those changes. No worries about delay. We get this SDK for free so appreciate any feedback and support 😄
I have updated to beta5 now and added retryOnError. Will see how that works. Regarding incubating, I wouldn't mind to try that as well but would appreciate some hints on it. We use it on prod so don't want to take too much risk with experimentation🙈
Here is how we setup apollo client currently:
ApolloClient.Builder()
.networkTransport(
HttpNetworkTransport.Builder()
.serverUrl(applicationInfo.hasuraUrl)
.httpEngine(KtorHttpEngine(httpClient))
.build()
)
.retryOnError { it.operation is Subscription }
.subscriptionNetworkTransport(
WebSocketNetworkTransport.Builder()
.webSocketEngine(KtorWebSocketEngine(httpClient))
.protocol(GraphQLWsProtocol.Factory())
.serverUrl(applicationInfo.hasuraUrl)
/**
* ApolloClient on logout stop subscriptions and wait by default 60 sec
* until connection is closed. Below function change that to 5 sec
* so another user has no chance to login and re-use previous connection.
*/
.idleTimeoutMillis(5000)
.reopenWhen { throwable, attempt ->
log.i(
throwable = throwable,
messageString = "Reopen when attempt $attempt"
)
if (throwable is ApolloWebSocketClosedException) {
log.w(
throwable = throwable,
messageString = "Web socket closed"
)
}
/**
* Power of 13 attempts give ~5 sec delay.
* If it's higher attempt we keep 5 sec delay max.
*/
exponentialDelay(attempt)
}
.build()
)
.build()
Things that I would need to address:
- webSocketEngine cannot take
KtorWebSocketEngine
. Looks like there is just one implementation ofWebSocketEngine
inside incubating which isAppleWebSocketEngine
- protocol replaced with wsProtocol and there is no
Factory
forGraphQLWsProtocol
but I guess it suppose to be created now with constructor? - There is no
reopenWhen
which was called on network disconnections. Is it not needed anymore? Is every disconnection now passed to each subscription or is it handled silently by SDK?
Additionally, you can see that I am reducing idle timeout. I discovered that re-login user kept using previous connection. Do you know if there is a better way to invalidate connection?
Thanks!
Hey thanks for the follow up!
- webSocketEngine cannot take KtorWebSocketEngine. Looks like there is just one implementation of WebSocketEngine inside incubating which is AppleWebSocketEngine
Indeed the WebSocketEngine
has changed too. I'll work on ktor-engine-incubating
. Follow up issue here
- protocol replaced with wsProtocol and there is no Factory for GraphQLWsProtocol but I guess it suppose to be created now with constructor?
TBH I'm not sure why this needed to be a factory. WsProtocol
is mostly stateless and the constructors do not do any IO so it seemed better to bypass the Factory
altogether.
- There is no reopenWhen which was called on network disconnections. Is it not needed anymore? Is every disconnection now passed to each subscription or is it handled silently by SDK?
Exactly. I realized recently that managing the reopen
at the WebSocket layer was wrong because it doesn't work for multipart subscriptions.
In general, I like to see the WebSocket
more and more as a raw socket. Just like HTTP 2 allows multiplexing several streams on a single raw socket, the WebSocket
allows multiplexing several subscription streams. In terms of GraphQL APIs, how connections are pooled and reopened should not matter.
Using retryOnError {}
gives more control to individually retry some subscriptions. For an example "cold" subscriptions (which depend on a client state, think countTo(10)
as opposed to "warm" subscriptions like currentTime
) might not want to be retried.
As a nice bonus, 4.0.0-beta.5
uses Android/iOS APIs to retry when network connectivity comes back instead of exponential backoff for reduced latency.
Additionally, you can see that I am reducing idle timeout. I discovered that re-login user kept using previous connection. Do you know if there is a better way to invalidate connection?
Right. That's actually the (big) difference between a GraphQL WebSocket
and a raw HTTP2 socket is that the GraphQL WebSocket
is stateful. I'm not a huge fan of this. Can you share more details about how authentication is handled by your backend? Especially how do you know a subscription has expired? I'm hoping you can handle that with a custom ApolloInterceptor
hey @martinbonnin I have played a bit with retryOnError
and it's being called on every new subscription with EmptyExecutionContext
. While writing this message I realised I might have misunderstood this operator. Correct me if I am wrong but it's setting up condition for later when actual error is encountered, right?
Btw, I also noticed that on individual operators I could call setRetryOnError
and read it then inside retryOnError { it.operation.retryOnError }
So I could use that if I want let individual calls to decide if it should be retried.
Oooh my..., the KDoc for ApolloClient.Builder
is severaly lacking. Follow up issue here.
-
ApolloCall.retryOnError(Boolean)
uses a boolean that lets individual operation decide if they want to be retried -
ApolloClient.Builder.retryOnError((ApolloRequest) -> Boolean)
uses a function that determines the default value if noretryOnError
has been set on theApolloCall
. Usually you'll do{ it.operation is Subscription }
to only retry subscriptions (but you might want to retry queries too)
The actual logic is done in RetryOnErrorInterceptor. It leverages coroutines to retry the Flow
whenever an error happens. If no error happens then no retry is done.
Hope this helps, since this is all pretty new there's not a lot of docs but I'll get working on this (edit: KDoc PR here).
@damianpetla 4.0.0-beta.6
has shipped with KDoc, documentation and migration guide: https://www.apollographql.com/docs/kotlin/v4/advanced/experimental-websockets. Let us know how that goes.
hey @martinbonnin I was trying it out but don't know what to do with KtorWebSocketEngine
that we were using so far on previous WebSocketNetworkTransport
Right, this part is still needed... We're actually looking into breaking down the apollo-kotlin repo in smaller ones. KtorWebSocketEngine
is a lot less stable than ApolloClient
and it doesn't really make sense for them to share the versioning. KtorWebSocketEngine
might be a good candidate for that... It might take a while to get it published and everything. In the meantime, I'll look into sending you a gist that you can copy/paste in your project.
@damianpetla gist is here.
I have tested it passes all the WebSocketNetworkTransportTest tests except one little corner case where it throws ApolloNetworkException
instead of ApolloWebSocketClosedException
here. I'll work on getting that tested and published properly but in the meantime you should be able to just copy/pasta this.
hey @martinbonnin I have tried your gist but for some reason subscriptions stopped emitting any data. I have logged the engine and I see frames being send and received.
Can you upload a small reproducer somewhere?
Not sure really how at the moment, building something on a side would probably take too much time. I might look into the code more later and try debugging more. Swamped with work currently 😓 Even trying it was a bit stretching but not giving up yet so don't worry :)
I hear you, thanks for looking into this! WebSockets are hard because the transport isn't specified so there are a lot of variants out there. To make things worse, they are very often behind authentication, making it really hard to reproduce the issues.
I've uploaded a small playground here that uses MockServer
. Hopefully it can help reproducing/understanding what's going wrong there. Let me know if you find out anything.
Hey @martinbonnin I will be looking into this again in upcoming days. I will setup first some tests for existing configurations and functions that we have and then try switching to new transport. It's been a while so just checking if there are any interesting updates.
Thanks for the follow up. No specific updates. Try with 4.0.0-beta.7
. You might need 4.0.0-beta.8-SNAPSHOT
because of https://github.com/apollographql/apollo-kotlin/pull/5948
hey @martinbonnin why examining current setup I was trying passing bearer with Auth ktor plugin instead defaultRequest
. However, when token expires WS throws this exception
com.apollographql.apollo3.exception.ApolloWebSocketClosedException: WebSocket Closed code='4403' reason='{"server_error_msg":"4403: Forbidden"}'
and this 👇 is never called. I know refreshTokens is used for HTTP 401 but would actually be great if something similar was working for WS where I could provide new token.
install(Auth) {
bearer {
refreshTokens {
}
}
}
This is still with beta.5
. I have some testing setup already and I should be trying new Apollo SDK soon but with own ktor engine this would not be addressed I guess.
Anyway, I was curious how would you deal with it? I will probably setup this 👇 to make sure on WSS there is always fresh token used.
defaultRequest {
if (url.protocol == URLProtocol.WSS) {
header(
HttpHeaders.Authorization,
"new token goes here"
)
}
}
EDIT: defaultRequest seem to be a bad idea. That if condition does not work because I get there HTTP
hey @martinbonnin maybe you could help me out simulate closing web socket by server due to expired token. I know I could simulate it with responseBody.enqueueMessage(CloseFrame(4403, "{\"server_error_msg\":\"4403: Forbidden\"}"))
and it works to some degree throwing exception that I am expecting. However, test fails with
java.lang.IllegalStateException: No more responses in queue
at com.apollographql.apollo3.mockserver.QueueMockServerHandler.handle(QueueMockServerHandler.kt:20)
at com.apollographql.apollo3.mockserver.MockServerImpl.handleRequests(MockServer.kt:187)
at com.apollographql.apollo3.mockserver.MockServerImpl.access$handleRequests(MockServer.kt:121)
at com.apollographql.apollo3.mockserver.MockServerImpl$handleRequests$1.invokeSuspend(MockServer.kt)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:585)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:802)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:706)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:693)
I have tried enqueuing next message responseBody.enqueueMessage(TextMessage("{\"type\": \"next\", \"id\": \"$operationId\", \"payload\": $lives}"))
that I called prior to closing WS. I guess it's not the right way to do it and I cannot figure this out. Once I have this I could be testing my setups more and provide more feedback on newer Apollo versions.
Hi @damianpetla !
I guess the thing we need to understand is what your server is doing when a token expired. Looks like it's sending a 4403 CloseFrame
and requiring your clients to reconnect? If that's the case, you can use MockServer.enqueueWebSocket
to enqueue a 2nd session. Not sure that help?
Or if you have a Ktor server around, you could as well use this one for tests instead of MockServer
? Could be another option?
hey @martinbonnin 4403 is what our server return when token expire and it is causing reconnection, reopenWhen on subscription transport is fired. I have resolved my issue by separating ktor HttpClient for regular transport and subscription transport. Both use base HttpClient where logging is and then:
- NetworkTransport uses Auth plugin with loadTokens and refreshTokens. Also added Retry for 403 but that's another story
- SubscriptionTransport uses only defaultRequest plugin which set Authentication header on every call. That solution seems to be working well for me.
Thanks for the update! Note that I have had issues detecting the WebSocket close frame code reliably on Apple platforms. See https://youtrack.jetbrains.com/issue/KTOR-6198. You might want to use some else (maybe a dedicated GraphQL error or something like this).
But all in all, looks like this issue is resolved? If yes, mind if I close it? Or is there anything remaining?