graphql-flutter icon indicating copy to clipboard operation
graphql-flutter copied to clipboard

Subscription not rec-connecting

Open vytautas-pranskunas- opened this issue 4 years ago • 5 comments

I have subscription which works fine, however if i make any changes in backed which causes backend api to re-start subscription on client side gets to reconnecting state and hangs evenn when API gets back online

image

vytautas-pranskunas- avatar Apr 05 '21 10:04 vytautas-pranskunas-

Same problem with subscription not reconnecting after network off/on, version 5.0.0-nullsafety.4

sokolej79 avatar May 06 '21 17:05 sokolej79

Itis no started to throw errors like this: image

vytautas-pranskunas- avatar May 07 '21 09:05 vytautas-pranskunas-

hello @vytautas-pranskunas- , could you please share an example where you use subscription. And have you find a solution for your issue ? thx

redDwarf03 avatar May 16 '22 08:05 redDwarf03

Hello, yes i have a solution - i have my own client which works fine. I have copied it from one of your other issues which is being closed already: https://github.com/zino-hofmann/graphql-flutter/pull/888

code:


typedef GetInitPayload = FutureOr<dynamic> Function();

/// A definition for functions that returns a connected [WebSocketChannel]
typedef WebSocketConnect = FutureOr<WebSocketChannel> Function(
  Uri uri,
  Iterable<String>? protocols,
);

// create uuid generator
const _uuid = Uuid(options: {'grng': UuidUtil.cryptoRNG});

class SubscriptionListener {
  Function callback;
  bool hasBeenTriggered = false;

  SubscriptionListener(this.callback, this.hasBeenTriggered);
}

enum SenioSocketConnectionState { notConnected, connecting, connected }

class SenioSocketClientConfig {
  const SenioSocketClientConfig({
    this.serializer = const RequestSerializer(),
    this.parser = const ResponseParser(),
    this.autoReconnect = true,
    this.queryAndMutationTimeout = const Duration(seconds: 10),
    this.inactivityTimeout = const Duration(seconds: 30),
    this.delayBetweenReconnectionAttempts = const Duration(seconds: 5),
    this.initialPayload,
    @experimental this.connect = defaultConnect,
  });

  /// Serializer used to serialize request
  final RequestSerializer serializer;

  /// Response parser
  final ResponseParser parser;

  /// Whether to reconnect to the server after detecting connection loss.
  final bool autoReconnect;

  /// The duration after which the connection is considered unstable, because no keep alive message
  /// was received from the server in the given time-frame. The connection to the server will be closed.
  /// If [autoReconnect] is set to true, we try to reconnect to the server after the specified [delayBetweenReconnectionAttempts].
  ///
  /// If null, the keep alive messages will be ignored.
  final Duration? inactivityTimeout;

  /// The duration that needs to pass before trying to reconnect to the server after a connection loss.
  /// This only takes effect when [autoReconnect] is set to true.
  ///
  /// If null, the reconnection will occur immediately, although not recommended.
  final Duration? delayBetweenReconnectionAttempts;

  /// The duration after which a query or mutation should time out.
  /// If null, no timeout is applied, although not recommended.
  final Duration? queryAndMutationTimeout;

  /// Connect or reconnect to the websocket.
  ///
  /// Useful supplying custom headers to an IO client, registering custom listeners,
  /// and extracting the socket for other non-graphql features.
  ///
  /// Warning: if you want to listen to the listen to the stream,
  /// wrap your channel with our [GraphQLWebSocketChannel] using the `.forGraphQL()` helper:
  /// ```dart
  /// connect: (url, protocols) {
  ///    var channel = WebSocketChannel.connect(url, protocols: protocols)
  ///    // without this line, our client won't be able to listen to stream events,
  ///    // because you are already listening.
  ///    channel = channel.forGraphQL();
  ///    channel.stream.listen(myListener)
  ///    return channel;
  /// }
  /// ```
  ///
  /// To supply custom headers to an IO client:
  /// ```dart
  /// connect: (url, protocols) =>
  ///   IOWebSocketChannel.connect(url, protocols: protocols, headers: myCustomHeaders)
  /// ```
  final WebSocketConnect connect;

  static Future<WebSocketChannel> defaultConnect(
    Uri uri,
    Iterable<String>? protocols,
  ) async {
    final sock = await WebSocket.connect(uri.toString(), protocols: protocols);
    return IOWebSocketChannel(sock).forGraphQL();
  }

  /// Payload to be sent with the connection_init request.
  ///
  /// Can be a literal value, a callback, or an async callback. End value must be valid argument for `json.encode`.
  ///
  /// Internal usage is roughly:
  /// ```dart
  /// Future<InitOperation> get initOperation async {
  ///   if (initialPayload is Function) {
  ///     final dynamic payload = await initialPayload();
  ///     return InitOperation(payload);
  ///   } else {
  ///     return InitOperation(initialPayload);
  ///   }
  /// }
  /// ```
  final dynamic initialPayload;

  Future<InitOperation> get initOperation async {
    if (initialPayload is Function) {
      final dynamic payload = await initialPayload();
      return InitOperation(payload);
    } else {
      return InitOperation(initialPayload);
    }
  }
}

/// Wraps a standard web socket instance to marshal and un-marshal the server /
/// client payloads into dart object representation.
///
/// This class also deals with reconnection, handles timeout and keep alive messages.
///
/// It is meant to be instantiated once, and you can let this class handle all the heavy-
/// lifting of socket state management. Once you're done with the socket connection, make sure
/// you call the [dispose] method to release all allocated resources.
class SenioSocketClient {
  SenioSocketClient(
    this.url, {
    this.protocols = const ['graphql-ws'],
    WebSocketConnect? connect,
    this.config = const SenioSocketClientConfig(),
    @visibleForTesting this.randomBytesForUuid,
    @visibleForTesting this.onMessage,
    @visibleForTesting this.onStreamError = _defaultOnStreamError,
  }) {
    _connect();
  }

  Uint8List? randomBytesForUuid;
  final String url;
  final Iterable<String>? protocols;
  final SenioSocketClientConfig config;

  final BehaviorSubject<SenioSocketConnectionState> _connectionStateController =
      BehaviorSubject<SenioSocketConnectionState>();

  final HashMap<String, SubscriptionListener> _subscriptionInitializers = HashMap();

  bool _connectionWasLost = false;

  Timer? _reconnectTimer;

  @visibleForTesting
  GraphQLWebSocketChannel? socketChannel;

  @visibleForTesting
  void Function(GraphQLSocketMessage)? onMessage;

  @visibleForTesting
  void Function(Object error, StackTrace stackTrace) onStreamError;

  Stream<GraphQLSocketMessage> get _messages => socketChannel!.messages;

  StreamSubscription<ConnectionKeepAlive>? _keepAliveSubscription;
  StreamSubscription<GraphQLSocketMessage>? _messageSubscription;

  Map<String, dynamic> Function(Request) get serialize => config.serializer.serializeRequest;

  Response Function(Map<String, dynamic>) get parse => config.parser.parseResponse;

  void _disconnectOnKeepAliveTimeout(Stream<GraphQLSocketMessage> messages) {
    _keepAliveSubscription = messages.whereType<ConnectionKeepAlive>().timeout(
      config.inactivityTimeout!,
      onTimeout: (EventSink<ConnectionKeepAlive> event) {
        event.close();
        socketChannel!.sink.close(ws_status.goingAway);
        _connectionStateController.add(SenioSocketConnectionState.notConnected);
      },
    ).listen(null);
  }

  /// Connects to the server.
  ///
  /// If this instance is disposed, this method does nothing.
  Future<void> _connect() async {
    final InitOperation initOperation = await config.initOperation;

    if (_connectionStateController.isClosed) {
      return;
    }

    _connectionStateController.add(SenioSocketConnectionState.connecting);
    debugPrint('Connecting to websocket: $url...');

    try {
      final channel = await config.connect(Uri.parse(url), protocols);

      // Even though config.connect is sync, we call async in order to make the
      // SenioSocketConnectionState.connected attribution not overload SenioSocketConnectionState.connecting
      socketChannel = channel.forGraphQL();
      _connectionStateController.add(SenioSocketConnectionState.connected);
      debugPrint('Connected to websocket.');
      _write(initOperation);

      if (config.inactivityTimeout != null) {
        _disconnectOnKeepAliveTimeout(_messages);
      }

      _messageSubscription = _messages.listen(
        onMessage,
        onDone: onConnectionLost,
        cancelOnError: true,
        onError: onStreamError,
      );

      if (_connectionWasLost) {
        for (final s in _subscriptionInitializers.values) {
          s.callback();
        }

        _connectionWasLost = false;
      }
    } catch (e) {
      onConnectionLost(e);
    }
  }

  void onConnectionLost([e]) {
    socketChannel?.sink.close(ws_status.goingAway);
    if (e != null) {
      debugPrint('There was an error causing connection lost: $e');
    }
    debugPrint('Disconnected from websocket.');
    _reconnectTimer?.cancel();
    _keepAliveSubscription?.cancel();
    _messageSubscription?.cancel();

    if (_connectionStateController.isClosed) {
      return;
    }

    _connectionWasLost = true;
    for (var s in _subscriptionInitializers.values) {
      s.hasBeenTriggered = false;
    }

    if (_connectionStateController.value != SenioSocketConnectionState.notConnected) {
      _connectionStateController.add(SenioSocketConnectionState.notConnected);
    }

    if (config.autoReconnect && !_connectionStateController.isClosed) {
      if (config.delayBetweenReconnectionAttempts != null) {
        _reconnectTimer = Timer(
          config.delayBetweenReconnectionAttempts!,
          () {
            _connect();
          },
        );
      } else {
        Timer.run(() => _connect());
      }
    }
  }

  /// Closes the underlying socket if connected, and stops reconnection attempts.
  /// After calling this method, this [SocketClient] instance must be considered
  /// unusable. Instead, create a new instance of this class.
  ///
  /// Use this method if you'd like to disconnect from the specified server permanently,
  /// and you'd like to connect to another server instead of the current one.
  Future<void> dispose() async {
    debugPrint('Disposing socket client..');
    _reconnectTimer?.cancel();
    _keepAliveSubscription?.cancel();

    await Future.wait([
      socketChannel?.sink.close(ws_status.goingAway),
      _messageSubscription?.cancel(),
      _connectionStateController.close(),
    ].where((future) => future != null).cast<Future<dynamic>>().toList());
  }

  void _write(final GraphQLSocketMessage message) {
    if (_connectionStateController.value == SenioSocketConnectionState.connected) {
      socketChannel!.sink.add(
        json.encode(
          message,
          toEncodable: (dynamic m) => m.toJson(),
        ),
      );
    }
  }

  /// Sends a query, mutation or subscription request to the server, and returns a stream of the response.
  ///
  /// If the request is a query or mutation, a timeout will be applied to the request as specified by
  /// [SocketClientConfig]'s [queryAndMutationTimeout] field.
  ///
  /// If the request is a subscription, obviously no timeout is applied.
  ///
  /// In case of socket disconnection, the returned stream will be closed.
  Stream<Response> subscribe(
    final Request payload,
    final bool waitForConnection,
  ) {
    final String id = _uuid.v4(
      options: {
        'random': randomBytesForUuid,
      },
    ).toString();
    final StreamController<Response> response = StreamController<Response>();
    StreamSubscription<SenioSocketConnectionState>? sub;
    final bool addTimeout = !payload.isSubscription && config.queryAndMutationTimeout != null;

    // ignore: prefer_function_declarations_over_variables
    final onListen = () {
      final waitForConnectedStateWithoutTimeout = (waitForConnection
              ? _connectionStateController
              : _connectionStateController.startWith(SenioSocketConnectionState.connected))
          .where((SenioSocketConnectionState state) => state == SenioSocketConnectionState.connected)
          .take(1);

      final Stream<SenioSocketConnectionState> waitForConnectedState = addTimeout
          ? waitForConnectedStateWithoutTimeout.timeout(
              config.queryAndMutationTimeout!,
              onTimeout: (EventSink<SenioSocketConnectionState> event) {
                debugPrint('Connection timed out.');
                response.addError(TimeoutException('Connection timed out.'));
                event.close();
                response.close();
              },
            )
          : waitForConnectedStateWithoutTimeout;

      sub = waitForConnectedState.listen((_) {
        final Stream<GraphQLSocketMessage> dataErrorComplete = _messages.where(
          (GraphQLSocketMessage message) {
            if (message is SubscriptionData) {
              return message.id == id;
            }

            if (message is SubscriptionError) {
              return message.id == id;
            }

            if (message is SubscriptionComplete) {
              return message.id == id;
            }

            return false;
          },
        ).takeWhile((_) => !response.isClosed);

        final Stream<GraphQLSocketMessage> subscriptionComplete = addTimeout
            ? dataErrorComplete.where((message) => message is SubscriptionComplete).take(1).timeout(
                config.queryAndMutationTimeout!,
                onTimeout: (EventSink<GraphQLSocketMessage> event) {
                  debugPrint('Request timed out.');
                  response.addError(TimeoutException('Request timed out.'));
                  event.close();
                  response.close();
                },
              )
            : dataErrorComplete.where((message) => message is SubscriptionComplete).take(1);

        subscriptionComplete.listen((_) => response.close());

        dataErrorComplete
            .where((message) => message is SubscriptionData)
            .cast<SubscriptionData>()
            .listen((message) => response.add(
                  parse(message.toJson()),
                ));

        dataErrorComplete
            .where((message) => message is SubscriptionError)
            .cast<SubscriptionError>()
            .listen((message) => response.addError(message));

        if (!_subscriptionInitializers[id]!.hasBeenTriggered) {
          _write(
            StartOperation(
              id,
              serialize(payload),
            ),
          );
          _subscriptionInitializers[id]!.hasBeenTriggered = true;
        }
      });
    };

    response.onListen = onListen;

    response.onCancel = () {
      _subscriptionInitializers.remove(id);

      sub?.cancel();
      if (_connectionStateController.value == SenioSocketConnectionState.connected && socketChannel != null) {
        _write(StopOperation(id));
      }
    };

    _subscriptionInitializers[id] = SubscriptionListener(onListen, false);

    return response.stream;
  }

  /// These streams will emit done events when the current socket is done.
  /// A stream that emits the last value of the connection state upon subscription.
  Stream<SenioSocketConnectionState> get connectionState => _connectionStateController.stream;
}

void _defaultOnStreamError(Object error, StackTrace st) {
  debugPrint('[SocketClient] message stream ecnountered error: $error\n'
      'stacktrace:\n${st.toString()}');
}

vytautas-pranskunas- avatar May 16 '22 08:05 vytautas-pranskunas-