gql
gql copied to clipboard
[web_socket_link] The link doesn't send StopOperation to the backend
Hi there. Help needed.
I faced an issue that when I close StreamSubscription
of a stream WebSocketLink provides, the following code should be called:
186 response.onCancel = () {
187 messagesSubscription?.cancel();
188 _write(StopOperation(id)).catchError(response.addError);
189 _requests.removeWhere((e) => e.context.entry<RequestId>()!.id == id);
190 };
And then it should receive the SubscriptionComplete
status from the backend and close the StreamController
:
170 } else if (message is SubscriptionComplete) {
171 response.close();
172 }
According to this and this test, it should work.
But onCancel
callback is not called. hasListener
method shows that it still has listeners.
Here's the link I created:
class AuthorizedWebSocketLink extends Link {
AuthorizedWebSocketLink(this.url);
final String url;
gql.WebSocketLink? _client;
@override
Stream<Response> request(Request request, [NextLink? forward]) async* {
final context = request.context;
final headers = context.entry<HttpLinkHeaders>()?.headers ?? {};
final controller = StreamController<Response>();
StreamSubscription<Response>? subscription;
_client ??= gql.WebSocketLink(
null,
initialPayload: {
'authToken': 'token',
},
channelGenerator: () => IOWebSocketChannel.connect(
Uri.parse(url),
protocols: ['graphql-ws'],
headers: headers,
),
);
// I implemented it in this way to make sure the following code will be called
controller.onListen = () {
subscription = _client!.request(request, forward).listen(
controller.add,
onDone: controller.close,
);
};
// I can confirm that this will be called in my implementation and it will call subscription?.cancel() as well
controller.onCancel = () {
subscription?.cancel();
controller.close();
};
yield* controller.stream;
}
void dispose() {
_client?.dispose();
_client = null;
}
}
Client initialization:
_client = GraphQLClient(
link: Link.from([authLink, errorLink, authorizedWebSocketLink]),
cache: GraphQLCache(store: InMemoryStore()),
);
And the code that tests the implementation:
StreamSubscription<QueryResult>? _streamSubscription;
void _subscribe() {
final vars = generated.RequestArguments(id: 179);
final subscription = generated.Subscription(variables: vars);
final options = SubscriptionOptions(
document: subscription.document,
operationName: subscription.operationName,
variables: subscription.variables.toJson(),
);
_streamSubscription = _client!.subscribe(options).listen(print);
}
void _unsubscribe() {
_streamSubscription?.cancel();
}
What am I missing? Thanks
I faced the same issue. Any updates how to fix this?
I am currently busy with work, I might take up two weeks to fix this issue.
Hi @agent3bood, any updates or insights on what's causing the issue?
Hi, are there any updates?
I have not worked on it yet, is anybody is willing to create a fix?
After some investigation I was able to figure out what's going on.
The issue here is connected with ErrorLink
.
From its documentation, it's clear that the error link should be declared BEFORE http and web sockets links so any exception from the links will be handled in the error one.
Just like that:
// We use the [ErrorLink] by prepending it before the terminating link
final link = Link.from([
ErrorLink(onException: handleException),
terminatingLink,
]);
The issue is that ErrorLink
uses this code:
44 await for (final result in Result.captureStream(forward!(request))) {
Seems like Result.captureStream
and then calling onException
yield* Stream.error(error);
to replace the stream prevents StreamController
from WebSocketLink
to receive onCancel
callback. As result, the following code will never be called:
_write(StopOperation(id)).catchError(response.addError);
I'm still thinking what I can do with that but anyway want you to be aware about the issue.