grpc-node icon indicating copy to clipboard operation
grpc-node copied to clipboard

Client streaming retry interceptor

Open Arun-KumarH opened this issue 3 years ago • 0 comments

We are implementing a retry Intercepting call, so that in case of connection reset error the request is made again (ECONNRESET [{"message":"14 UNAVAILABLE: read ECONNRESET"}]).

This retry interceptor works fine for Unary requests but for client streaming request we observe that after the first chunk of data is sent then we receive a status OK and connection is closed. (although still other chunks are not being sent). From the retry block if we remove the line newCall.halfClose(); then the connection hangs after sending the first chunk of data and connection times out.

Is there an example for client / server stream retry mechanism ?

  retry_interceptor(options: InterceptorOptions, nextCall: NextCall): any {
    let savedMetadata: Metadata;
    let savedSendMessage: any;
    let savedReceiveMessage: any;
    let savedMessageNext: any;
    let maxRetries = 50;
    let requester = (new RequesterBuilder())
      .withStart(function (metadata: Metadata, listener: InterceptingListener,
        next: (metadata: Metadata, listener: InterceptingListener | Listener) => void): void {
        savedMetadata = metadata;
        let new_listener = (new ListenerBuilder())
          .withOnReceiveMessage(function (message: any, next: any) {
            savedReceiveMessage = message;
            savedMessageNext = next;
            // for response streaming
            if (options?.method_definition?.responseStream === true) {
              savedMessageNext(savedReceiveMessage);
            }
          })
          .withOnReceiveStatus(function (status: any, next: any) {
            let retries = 0;
            let retry = function (message: any, metadata: any) {
              retries++;
              let newCall = nextCall(options);
              let receivedMessage: any;
              newCall.start(metadata, {
                onReceiveMessage: function (message: any) {
                  receivedMessage = message;
                },
                onReceiveStatus: async function (status: StatusObject) {
                  if (status.code !== grpcStatus.OK) {
                    if (retries <= maxRetries) {
                      console.log('Retrying request', { retries, options });
                      retry(message, metadata);
                    } else if (savedMessageNext) {
                      savedMessageNext(receivedMessage);
                      next(status);
                    } else {
                      next(status);
                    }
                  } else {
                    let new_status: any = (new StatusBuilder())
                      .withCode(grpcStatus.OK).build();
                    savedMessageNext(receivedMessage);
                    next(new_status);
                  }
                }
              });
              newCall.sendMessage(message);
              newCall.halfClose();
            };
            if (status.code !== grpcStatus.OK) {
              console.log('Retrying due to status', status);
              retry(savedSendMessage, savedMetadata);
            } else {
              // not to send last chunk twice for response stream
              if (options?.method_definition?.responseStream === false) {
                savedMessageNext(savedReceiveMessage);
              }
              next(status);
            }
          }
          ).build();
        next(metadata, new_listener);
      })
      .withSendMessage(function (message, next) {
        savedSendMessage = message;
        next(message);
      }).build();
    return new InterceptingCall(nextCall(options), requester);
  };

Arun-KumarH avatar May 06 '22 12:05 Arun-KumarH