gql icon indicating copy to clipboard operation
gql copied to clipboard

[web_socket_link] The link doesn't send StopOperation to the backend

Open vysotsky opened this issue 3 years ago • 6 comments

Hi there. Help needed.

I faced an issue that when I close StreamSubscription of a stream WebSocketLink provides, the following code should be called:

 186   response.onCancel = () {
 187     messagesSubscription?.cancel();
 188     _write(StopOperation(id)).catchError(response.addError);
 189     _requests.removeWhere((e) => e.context.entry<RequestId>()!.id == id);
 190   };

And then it should receive the SubscriptionComplete status from the backend and close the StreamController:

 170          } else if (message is SubscriptionComplete) {
 171            response.close();
 172          }

According to this and this test, it should work.

But onCancel callback is not called. hasListener method shows that it still has listeners.

Here's the link I created:

class AuthorizedWebSocketLink extends Link {
  AuthorizedWebSocketLink(this.url);

  final String url;
  gql.WebSocketLink? _client;

  @override
  Stream<Response> request(Request request, [NextLink? forward]) async* {
    final context = request.context;
    final headers = context.entry<HttpLinkHeaders>()?.headers ?? {};

    final controller = StreamController<Response>();
    StreamSubscription<Response>? subscription;

    _client ??= gql.WebSocketLink(
      null,
      initialPayload: {
        'authToken': 'token',
      },
      channelGenerator: () => IOWebSocketChannel.connect(
        Uri.parse(url),
        protocols: ['graphql-ws'],
        headers: headers,
      ),
    );

    // I implemented it in this way to make sure the following code will be called
    controller.onListen = () {
      subscription = _client!.request(request, forward).listen( 
            controller.add,
            onDone: controller.close,
          );
    };

    // I can confirm that this will be called in my implementation and it will call subscription?.cancel() as well
    controller.onCancel = () {
      subscription?.cancel();
      controller.close();
    };

    yield* controller.stream;
  }

  void dispose() {
    _client?.dispose();
    _client = null;
  }
}

Client initialization:

    _client = GraphQLClient(
      link: Link.from([authLink, errorLink, authorizedWebSocketLink]),
      cache: GraphQLCache(store: InMemoryStore()),
    );

And the code that tests the implementation:

  StreamSubscription<QueryResult>? _streamSubscription;

  void _subscribe() {
    final vars = generated.RequestArguments(id: 179);
    final subscription = generated.Subscription(variables: vars);
    final options = SubscriptionOptions(
      document: subscription.document,
      operationName: subscription.operationName,
      variables: subscription.variables.toJson(),
    );

    _streamSubscription = _client!.subscribe(options).listen(print);
  }

  void _unsubscribe() {
    _streamSubscription?.cancel();
  }

What am I missing? Thanks

vysotsky avatar Aug 04 '21 12:08 vysotsky

I faced the same issue. Any updates how to fix this?

ObranS avatar Aug 09 '21 11:08 ObranS

I am currently busy with work, I might take up two weeks to fix this issue.

agent3bood avatar Aug 09 '21 11:08 agent3bood

Hi @agent3bood, any updates or insights on what's causing the issue?

dopecoder avatar Aug 22 '21 05:08 dopecoder

Hi, are there any updates?

piasetskiy avatar Sep 15 '21 09:09 piasetskiy

I have not worked on it yet, is anybody is willing to create a fix?

agent3bood avatar Sep 15 '21 12:09 agent3bood

After some investigation I was able to figure out what's going on. The issue here is connected with ErrorLink. From its documentation, it's clear that the error link should be declared BEFORE http and web sockets links so any exception from the links will be handled in the error one. Just like that:

 // We use the [ErrorLink] by prepending it before the terminating link
   final link = Link.from([
     ErrorLink(onException: handleException),
     terminatingLink,
   ]);

The issue is that ErrorLink uses this code: 44 await for (final result in Result.captureStream(forward!(request))) {

Seems like Result.captureStream and then calling onException yield* Stream.error(error); to replace the stream prevents StreamController from WebSocketLink to receive onCancel callback. As result, the following code will never be called:

_write(StopOperation(id)).catchError(response.addError);

I'm still thinking what I can do with that but anyway want you to be aware about the issue.

vysotsky avatar Oct 21 '21 14:10 vysotsky