graphql-flutter
graphql-flutter copied to clipboard
Subscription not rec-connecting
I have subscription which works fine, however if i make any changes in backed which causes backend api to re-start subscription on client side gets to reconnecting state and hangs evenn when API gets back online

Same problem with subscription not reconnecting after network off/on, version 5.0.0-nullsafety.4
Itis no started to throw errors like this:

hello @vytautas-pranskunas- , could you please share an example where you use subscription. And have you find a solution for your issue ? thx
Hello, yes i have a solution - i have my own client which works fine. I have copied it from one of your other issues which is being closed already: https://github.com/zino-hofmann/graphql-flutter/pull/888
code:
typedef GetInitPayload = FutureOr<dynamic> Function();
/// A definition for functions that returns a connected [WebSocketChannel]
typedef WebSocketConnect = FutureOr<WebSocketChannel> Function(
Uri uri,
Iterable<String>? protocols,
);
// create uuid generator
const _uuid = Uuid(options: {'grng': UuidUtil.cryptoRNG});
class SubscriptionListener {
Function callback;
bool hasBeenTriggered = false;
SubscriptionListener(this.callback, this.hasBeenTriggered);
}
enum SenioSocketConnectionState { notConnected, connecting, connected }
class SenioSocketClientConfig {
const SenioSocketClientConfig({
this.serializer = const RequestSerializer(),
this.parser = const ResponseParser(),
this.autoReconnect = true,
this.queryAndMutationTimeout = const Duration(seconds: 10),
this.inactivityTimeout = const Duration(seconds: 30),
this.delayBetweenReconnectionAttempts = const Duration(seconds: 5),
this.initialPayload,
@experimental this.connect = defaultConnect,
});
/// Serializer used to serialize request
final RequestSerializer serializer;
/// Response parser
final ResponseParser parser;
/// Whether to reconnect to the server after detecting connection loss.
final bool autoReconnect;
/// The duration after which the connection is considered unstable, because no keep alive message
/// was received from the server in the given time-frame. The connection to the server will be closed.
/// If [autoReconnect] is set to true, we try to reconnect to the server after the specified [delayBetweenReconnectionAttempts].
///
/// If null, the keep alive messages will be ignored.
final Duration? inactivityTimeout;
/// The duration that needs to pass before trying to reconnect to the server after a connection loss.
/// This only takes effect when [autoReconnect] is set to true.
///
/// If null, the reconnection will occur immediately, although not recommended.
final Duration? delayBetweenReconnectionAttempts;
/// The duration after which a query or mutation should time out.
/// If null, no timeout is applied, although not recommended.
final Duration? queryAndMutationTimeout;
/// Connect or reconnect to the websocket.
///
/// Useful supplying custom headers to an IO client, registering custom listeners,
/// and extracting the socket for other non-graphql features.
///
/// Warning: if you want to listen to the listen to the stream,
/// wrap your channel with our [GraphQLWebSocketChannel] using the `.forGraphQL()` helper:
/// ```dart
/// connect: (url, protocols) {
/// var channel = WebSocketChannel.connect(url, protocols: protocols)
/// // without this line, our client won't be able to listen to stream events,
/// // because you are already listening.
/// channel = channel.forGraphQL();
/// channel.stream.listen(myListener)
/// return channel;
/// }
/// ```
///
/// To supply custom headers to an IO client:
/// ```dart
/// connect: (url, protocols) =>
/// IOWebSocketChannel.connect(url, protocols: protocols, headers: myCustomHeaders)
/// ```
final WebSocketConnect connect;
static Future<WebSocketChannel> defaultConnect(
Uri uri,
Iterable<String>? protocols,
) async {
final sock = await WebSocket.connect(uri.toString(), protocols: protocols);
return IOWebSocketChannel(sock).forGraphQL();
}
/// Payload to be sent with the connection_init request.
///
/// Can be a literal value, a callback, or an async callback. End value must be valid argument for `json.encode`.
///
/// Internal usage is roughly:
/// ```dart
/// Future<InitOperation> get initOperation async {
/// if (initialPayload is Function) {
/// final dynamic payload = await initialPayload();
/// return InitOperation(payload);
/// } else {
/// return InitOperation(initialPayload);
/// }
/// }
/// ```
final dynamic initialPayload;
Future<InitOperation> get initOperation async {
if (initialPayload is Function) {
final dynamic payload = await initialPayload();
return InitOperation(payload);
} else {
return InitOperation(initialPayload);
}
}
}
/// Wraps a standard web socket instance to marshal and un-marshal the server /
/// client payloads into dart object representation.
///
/// This class also deals with reconnection, handles timeout and keep alive messages.
///
/// It is meant to be instantiated once, and you can let this class handle all the heavy-
/// lifting of socket state management. Once you're done with the socket connection, make sure
/// you call the [dispose] method to release all allocated resources.
class SenioSocketClient {
SenioSocketClient(
this.url, {
this.protocols = const ['graphql-ws'],
WebSocketConnect? connect,
this.config = const SenioSocketClientConfig(),
@visibleForTesting this.randomBytesForUuid,
@visibleForTesting this.onMessage,
@visibleForTesting this.onStreamError = _defaultOnStreamError,
}) {
_connect();
}
Uint8List? randomBytesForUuid;
final String url;
final Iterable<String>? protocols;
final SenioSocketClientConfig config;
final BehaviorSubject<SenioSocketConnectionState> _connectionStateController =
BehaviorSubject<SenioSocketConnectionState>();
final HashMap<String, SubscriptionListener> _subscriptionInitializers = HashMap();
bool _connectionWasLost = false;
Timer? _reconnectTimer;
@visibleForTesting
GraphQLWebSocketChannel? socketChannel;
@visibleForTesting
void Function(GraphQLSocketMessage)? onMessage;
@visibleForTesting
void Function(Object error, StackTrace stackTrace) onStreamError;
Stream<GraphQLSocketMessage> get _messages => socketChannel!.messages;
StreamSubscription<ConnectionKeepAlive>? _keepAliveSubscription;
StreamSubscription<GraphQLSocketMessage>? _messageSubscription;
Map<String, dynamic> Function(Request) get serialize => config.serializer.serializeRequest;
Response Function(Map<String, dynamic>) get parse => config.parser.parseResponse;
void _disconnectOnKeepAliveTimeout(Stream<GraphQLSocketMessage> messages) {
_keepAliveSubscription = messages.whereType<ConnectionKeepAlive>().timeout(
config.inactivityTimeout!,
onTimeout: (EventSink<ConnectionKeepAlive> event) {
event.close();
socketChannel!.sink.close(ws_status.goingAway);
_connectionStateController.add(SenioSocketConnectionState.notConnected);
},
).listen(null);
}
/// Connects to the server.
///
/// If this instance is disposed, this method does nothing.
Future<void> _connect() async {
final InitOperation initOperation = await config.initOperation;
if (_connectionStateController.isClosed) {
return;
}
_connectionStateController.add(SenioSocketConnectionState.connecting);
debugPrint('Connecting to websocket: $url...');
try {
final channel = await config.connect(Uri.parse(url), protocols);
// Even though config.connect is sync, we call async in order to make the
// SenioSocketConnectionState.connected attribution not overload SenioSocketConnectionState.connecting
socketChannel = channel.forGraphQL();
_connectionStateController.add(SenioSocketConnectionState.connected);
debugPrint('Connected to websocket.');
_write(initOperation);
if (config.inactivityTimeout != null) {
_disconnectOnKeepAliveTimeout(_messages);
}
_messageSubscription = _messages.listen(
onMessage,
onDone: onConnectionLost,
cancelOnError: true,
onError: onStreamError,
);
if (_connectionWasLost) {
for (final s in _subscriptionInitializers.values) {
s.callback();
}
_connectionWasLost = false;
}
} catch (e) {
onConnectionLost(e);
}
}
void onConnectionLost([e]) {
socketChannel?.sink.close(ws_status.goingAway);
if (e != null) {
debugPrint('There was an error causing connection lost: $e');
}
debugPrint('Disconnected from websocket.');
_reconnectTimer?.cancel();
_keepAliveSubscription?.cancel();
_messageSubscription?.cancel();
if (_connectionStateController.isClosed) {
return;
}
_connectionWasLost = true;
for (var s in _subscriptionInitializers.values) {
s.hasBeenTriggered = false;
}
if (_connectionStateController.value != SenioSocketConnectionState.notConnected) {
_connectionStateController.add(SenioSocketConnectionState.notConnected);
}
if (config.autoReconnect && !_connectionStateController.isClosed) {
if (config.delayBetweenReconnectionAttempts != null) {
_reconnectTimer = Timer(
config.delayBetweenReconnectionAttempts!,
() {
_connect();
},
);
} else {
Timer.run(() => _connect());
}
}
}
/// Closes the underlying socket if connected, and stops reconnection attempts.
/// After calling this method, this [SocketClient] instance must be considered
/// unusable. Instead, create a new instance of this class.
///
/// Use this method if you'd like to disconnect from the specified server permanently,
/// and you'd like to connect to another server instead of the current one.
Future<void> dispose() async {
debugPrint('Disposing socket client..');
_reconnectTimer?.cancel();
_keepAliveSubscription?.cancel();
await Future.wait([
socketChannel?.sink.close(ws_status.goingAway),
_messageSubscription?.cancel(),
_connectionStateController.close(),
].where((future) => future != null).cast<Future<dynamic>>().toList());
}
void _write(final GraphQLSocketMessage message) {
if (_connectionStateController.value == SenioSocketConnectionState.connected) {
socketChannel!.sink.add(
json.encode(
message,
toEncodable: (dynamic m) => m.toJson(),
),
);
}
}
/// Sends a query, mutation or subscription request to the server, and returns a stream of the response.
///
/// If the request is a query or mutation, a timeout will be applied to the request as specified by
/// [SocketClientConfig]'s [queryAndMutationTimeout] field.
///
/// If the request is a subscription, obviously no timeout is applied.
///
/// In case of socket disconnection, the returned stream will be closed.
Stream<Response> subscribe(
final Request payload,
final bool waitForConnection,
) {
final String id = _uuid.v4(
options: {
'random': randomBytesForUuid,
},
).toString();
final StreamController<Response> response = StreamController<Response>();
StreamSubscription<SenioSocketConnectionState>? sub;
final bool addTimeout = !payload.isSubscription && config.queryAndMutationTimeout != null;
// ignore: prefer_function_declarations_over_variables
final onListen = () {
final waitForConnectedStateWithoutTimeout = (waitForConnection
? _connectionStateController
: _connectionStateController.startWith(SenioSocketConnectionState.connected))
.where((SenioSocketConnectionState state) => state == SenioSocketConnectionState.connected)
.take(1);
final Stream<SenioSocketConnectionState> waitForConnectedState = addTimeout
? waitForConnectedStateWithoutTimeout.timeout(
config.queryAndMutationTimeout!,
onTimeout: (EventSink<SenioSocketConnectionState> event) {
debugPrint('Connection timed out.');
response.addError(TimeoutException('Connection timed out.'));
event.close();
response.close();
},
)
: waitForConnectedStateWithoutTimeout;
sub = waitForConnectedState.listen((_) {
final Stream<GraphQLSocketMessage> dataErrorComplete = _messages.where(
(GraphQLSocketMessage message) {
if (message is SubscriptionData) {
return message.id == id;
}
if (message is SubscriptionError) {
return message.id == id;
}
if (message is SubscriptionComplete) {
return message.id == id;
}
return false;
},
).takeWhile((_) => !response.isClosed);
final Stream<GraphQLSocketMessage> subscriptionComplete = addTimeout
? dataErrorComplete.where((message) => message is SubscriptionComplete).take(1).timeout(
config.queryAndMutationTimeout!,
onTimeout: (EventSink<GraphQLSocketMessage> event) {
debugPrint('Request timed out.');
response.addError(TimeoutException('Request timed out.'));
event.close();
response.close();
},
)
: dataErrorComplete.where((message) => message is SubscriptionComplete).take(1);
subscriptionComplete.listen((_) => response.close());
dataErrorComplete
.where((message) => message is SubscriptionData)
.cast<SubscriptionData>()
.listen((message) => response.add(
parse(message.toJson()),
));
dataErrorComplete
.where((message) => message is SubscriptionError)
.cast<SubscriptionError>()
.listen((message) => response.addError(message));
if (!_subscriptionInitializers[id]!.hasBeenTriggered) {
_write(
StartOperation(
id,
serialize(payload),
),
);
_subscriptionInitializers[id]!.hasBeenTriggered = true;
}
});
};
response.onListen = onListen;
response.onCancel = () {
_subscriptionInitializers.remove(id);
sub?.cancel();
if (_connectionStateController.value == SenioSocketConnectionState.connected && socketChannel != null) {
_write(StopOperation(id));
}
};
_subscriptionInitializers[id] = SubscriptionListener(onListen, false);
return response.stream;
}
/// These streams will emit done events when the current socket is done.
/// A stream that emits the last value of the connection state upon subscription.
Stream<SenioSocketConnectionState> get connectionState => _connectionStateController.stream;
}
void _defaultOnStreamError(Object error, StackTrace st) {
debugPrint('[SocketClient] message stream ecnountered error: $error\n'
'stacktrace:\n${st.toString()}');
}