grpc-node
grpc-node copied to clipboard
Client streaming retry interceptor
We are implementing a retry Intercepting call, so that in case of connection reset error the request is made again (ECONNRESET [{"message":"14 UNAVAILABLE: read ECONNRESET"}]).
This retry interceptor works fine for Unary requests but for client streaming request we observe that after the first chunk of data is sent then we receive a status OK and connection is closed. (although still other chunks are not being sent).
From the retry block if we remove the line newCall.halfClose(); then the connection hangs after sending the first chunk of data and connection times out.
Is there an example for client / server stream retry mechanism ?
retry_interceptor(options: InterceptorOptions, nextCall: NextCall): any {
let savedMetadata: Metadata;
let savedSendMessage: any;
let savedReceiveMessage: any;
let savedMessageNext: any;
let maxRetries = 50;
let requester = (new RequesterBuilder())
.withStart(function (metadata: Metadata, listener: InterceptingListener,
next: (metadata: Metadata, listener: InterceptingListener | Listener) => void): void {
savedMetadata = metadata;
let new_listener = (new ListenerBuilder())
.withOnReceiveMessage(function (message: any, next: any) {
savedReceiveMessage = message;
savedMessageNext = next;
// for response streaming
if (options?.method_definition?.responseStream === true) {
savedMessageNext(savedReceiveMessage);
}
})
.withOnReceiveStatus(function (status: any, next: any) {
let retries = 0;
let retry = function (message: any, metadata: any) {
retries++;
let newCall = nextCall(options);
let receivedMessage: any;
newCall.start(metadata, {
onReceiveMessage: function (message: any) {
receivedMessage = message;
},
onReceiveStatus: async function (status: StatusObject) {
if (status.code !== grpcStatus.OK) {
if (retries <= maxRetries) {
console.log('Retrying request', { retries, options });
retry(message, metadata);
} else if (savedMessageNext) {
savedMessageNext(receivedMessage);
next(status);
} else {
next(status);
}
} else {
let new_status: any = (new StatusBuilder())
.withCode(grpcStatus.OK).build();
savedMessageNext(receivedMessage);
next(new_status);
}
}
});
newCall.sendMessage(message);
newCall.halfClose();
};
if (status.code !== grpcStatus.OK) {
console.log('Retrying due to status', status);
retry(savedSendMessage, savedMetadata);
} else {
// not to send last chunk twice for response stream
if (options?.method_definition?.responseStream === false) {
savedMessageNext(savedReceiveMessage);
}
next(status);
}
}
).build();
next(metadata, new_listener);
})
.withSendMessage(function (message, next) {
savedSendMessage = message;
next(message);
}).build();
return new InterceptingCall(nextCall(options), requester);
};