grpc-dart icon indicating copy to clipboard operation
grpc-dart copied to clipboard

Interceptor for handling response

Open rockerhieu opened this issue 4 years ago • 14 comments

Can we use ClientInterceptor for handling common use cases or errors from the response?

rockerhieu avatar Dec 09 '20 20:12 rockerhieu

thank!

len8657 avatar Dec 10 '20 07:12 len8657

The question is rather ambiguous, so I don't know whether to say yes or no. Interceptor is called before you make the call and also can listen for responses because it has access to ResponseFuture/ResponseStream. Whether that's enough for what you are trying to achieve or not - I don't know, it depends on what exactly you are trying to achieve. If you would like a more concrete answer, you would have to ask a more concrete questions.

@len8657 the compilation error you have previously posted indicate that you are trying to use gRPC-Web from native Flutter application. This is not a support use case currently. Use native gRPC instead (make sure you import package:grpc/grpc.dart and not package:grpc/grpc-web.dart)

mraleph avatar Dec 10 '20 08:12 mraleph

The question is rather ambiguous, so I don't know whether to say yes or no. Interceptor is called before you make the call and also can listen for responses because it has access to ResponseFuture/ResponseStream. Whether that's enough for what you are trying to achieve or not - I don't know, it depends on what exactly you are trying to achieve. If you would like a more concrete answer, you would have to ask a more concrete questions.

@len8657 the compilation error you have previously posted indicate that you are trying to use gRPC-Web from native Flutter application. This is not a support use case currently. Use native gRPC instead (make sure you import package:grpc/grpc.dart and not package:grpc/grpc-web.dart)

Yes! This answer inspired me,I use $createUnaryCall with protobuffer.

I have another problem,I received sends single request called, received response data , can't received single response called.

abstract class ClientInterceptor {
// Intercept unary call.
// This method is called when client sends single request and receives single response.
ResponseFuture interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
return invoker(method, request, options);
}

my code

class TPClientInterceptor implements ClientInterceptor {
  final int _id;
  int _unary = 0;
  int _streaming = 0;

  static final List<InterceptorInvocation> _invocations = new List();

  TPClientInterceptor(this._id);

  @override
  ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
      CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
    _invocations.add(InterceptorInvocation(_id, ++_unary, _streaming));
    print('interceptUnary');
    return invoker(method, request, _inject(options));
  }

  CallOptions _inject(CallOptions options) {
    print('_inject');
    return options.mergedWith(CallOptions(metadata: {
      "x-interceptor": _invocations.map((i) => i.toString()).join(', '),
    }));
  }

  @override
  ResponseStream<R> interceptStreaming<Q, R>(
      ClientMethod<Q, R> method,
      Stream<Q> requests,
      CallOptions options,
      ClientStreamingInvoker<Q, R> invoker) {
    _invocations.add(InterceptorInvocation(_id, _unary, ++_streaming));
    print('interceptStreaming');

    return invoker(method, requests, _inject(options));
  }

  static void tearDown() {
    _invocations.clear();
  }
}

  static HousePlanServicesClient getHousePlanServicesClient() {
    var channel = RPCHelper.tpClientChannel;
    Iterable<ClientInterceptor> interceptors = [
      TPClientInterceptor(1),
    ];
    HousePlanServicesClient hc =
        HousePlanServicesClient(channel, interceptors: interceptors);
    return hc;
  }

len8657 avatar Dec 10 '20 09:12 len8657

@len8657 You need to listen to ResponseFuture/ResponseStream which invoker returns to intercept the response. e.g. instead of return invoker(method, request, _inject(options)); you can do

  final response = invoker(method, request, _inject(options));
  response.then((r) {
    print('got response: $r');
  });
  return response;

Though I see that ResponseFuture breaks composability here because ResponseFuture.then returns a Future and not ResponseFuture, which means you can't write

  return invoker(...).then(transformResponse);

I think I would be happy to take a PR that fixes ResponseFuture.then to return ResponseFuture to enable this sort of interception.

mraleph avatar Dec 10 '20 09:12 mraleph

@len8657 You need to listen to ResponseFuture/ResponseStream which invoker returns to intercept the response. e.g. instead of return invoker(method, request, _inject(options)); you can do

  final response = invoker(method, request, _inject(options));
  response.then((r) {
    print('got response: $r');
  });
  return response;

Though I see that ResponseFuture breaks composability here because ResponseFuture.then returns a Future and not ResponseFuture, which means you can't write

  return invoker(...).then(transformResponse);

I think I would be happy to take a PR that fixes ResponseFuture.then to return ResponseFuture to enable this sort of interception.

I should have mentioned this issue when asking the question. Thought there is a different way to do it as ResponseFuture.then returns a Future, that's why I asked if we can use ClientInterceptor to handle the response. Apparently the answer is ~no~ not yet. I can work on the PR.

rockerhieu avatar Dec 10 '20 17:12 rockerhieu

@len8657 You need to listen to ResponseFuture/ResponseStream which invoker returns to intercept the response. e.g. instead of return invoker(method, request, _inject(options)); you can do

  final response = invoker(method, request, _inject(options));
  response.then((r) {
    print('got response: $r');
  });
  return response;

Though I see that ResponseFuture breaks composability here because ResponseFuture.then returns a Future and not ResponseFuture, which means you can't write

  return invoker(...).then(transformResponse);

I think I would be happy to take a PR that fixes ResponseFuture.then to return ResponseFuture to enable this sort of interception.

This is working for me

len8657 avatar Dec 11 '20 02:12 len8657

@len8657 You need to listen to ResponseFuture/ResponseStream which invoker returns to intercept the response. e.g. instead of return invoker(method, request, _inject(options)); you can do

  final response = invoker(method, request, _inject(options));
  response.then((r) {
    print('got response: $r');
  });
  return response;

Though I see that ResponseFuture breaks composability here because ResponseFuture.then returns a Future and not ResponseFuture, which means you can't write

  return invoker(...).then(transformResponse);

I think I would be happy to take a PR that fixes ResponseFuture.then to return ResponseFuture to enable this sort of interception.

@mraleph I just submitted a PR for this at #419. Let me know what you think.

rockerhieu avatar Jan 08 '21 22:01 rockerhieu

Now that I look at this with fresh eyes I think the instead of trying to allow ResponseFuture chaining we could just split interceptors into interceptors which handle calling transformation and interceptors that handle response transformation, e.g. something along the lines of:

abstract class ClientInterceptor {
  // Intercept unary call.
  // This method is called when client sends single request and receives single response.
  ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
      CallOptions options, ClientUnaryInvoker<Q, R> invoker);

  Future<R> interceptUnaryResponse<R>(ClientMethod<dynamic, R> method, Future<R> response);

  // Intercept streaming call.
  // This method is called when client sends either request or response stream.
  ResponseStream<R> interceptStreaming<Q, R>(
      ClientMethod<Q, R> method,
      Stream<Q> requests,
      CallOptions options,
      ClientStreamingInvoker<Q, R> invoker);

  Stream<R> interceptStreamingResponse<R>(ClientMethod<dynamic, R> method, Stream<R> response);
}

Then we don't need to deal with ResponseFuture gnarlines at all.

Would that be sufficient for user purposes?

mraleph avatar May 02 '22 07:05 mraleph

Something like this might be a solution: https://github.com/grpc/grpc-dart/pull/548

mraleph avatar May 02 '22 08:05 mraleph

Workaround & example code (not thoroughly tested):

import 'dart:async';

import 'package:async/async.dart';

import 'package:grpc/grpc_or_grpcweb.dart';
import 'package:grpc/service_api.dart';

class LogoutInterceptor implements ClientInterceptor {
  final void Function() onLoggedOutDetected;

  LogoutInterceptor({required this.onLoggedOutDetected});

  @override
  ResponseStream<R> interceptStreaming<Q, R>(
      ClientMethod<Q, R> method,
      Stream<Q> requests,
      CallOptions options,
      ClientStreamingInvoker<Q, R> invoker) {
    return DelegatingResponseStream<R>(invoker(method, requests, options))
        .handleError((Object error) {
      onLoggedOutDetected();
    }, test: (e) => (e is GrpcError) && e.code == StatusCode.unauthenticated);
  }

  @override
  ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
      CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
    return DelegatingResponseFuture<R>(invoker(method, request, options))
        .catchError((Object error) {
      onLoggedOutDetected();
    }, test: (e) => e is GrpcError && e.code == StatusCode.unauthenticated);
  }
}

class DelegatingResponseFuture<R> extends DelegatingFuture<R>
    implements ResponseFuture<R> {
  Response responseDelegate;

  DelegatingResponseFuture.split(
      this.responseDelegate, Future<R> futureDelegate)
      : super(futureDelegate);

  DelegatingResponseFuture(ResponseFuture<R> delegate)
      : this.split(delegate, delegate);

  @override
  ResponseStream<R> asStream() =>
      DelegatingResponseStream.split(responseDelegate, super.asStream());

  @override
  ResponseFuture<R> catchError(Function onError,
          {bool Function(Object error)? test}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.catchError(onError, test: test));

  @override
  ResponseFuture<S> then<S>(FutureOr<S> Function(R) onValue,
          {Function? onError}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.then(onValue, onError: onError));

  @override
  ResponseFuture<R> whenComplete(FutureOr Function() action) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.whenComplete(action));

  @override
  ResponseFuture<R> timeout(Duration timeLimit,
          {FutureOr<R> Function()? onTimeout}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.timeout(timeLimit, onTimeout: onTimeout));

  @override
  Future<void> cancel() {
    return responseDelegate.cancel();
  }

  @override
  Future<Map<String, String>> get headers => responseDelegate.headers;

  @override
  Future<Map<String, String>> get trailers => responseDelegate.trailers;
}

class DelegatingResponseStream<R> extends DelegatingStream<R>
    implements ResponseStream<R> {
  Response responseDelegate;

  DelegatingResponseStream.split(
      this.responseDelegate, Stream<R> streamDelegate)
      : super(streamDelegate);

  DelegatingResponseStream(ResponseStream<R> delegate)
      : this.split(delegate, delegate);

  @override
  ResponseFuture<R> get single =>
      DelegatingResponseFuture.split(responseDelegate, super.single);

  @override
  ResponseStream<R> handleError(Function onError,
      {bool Function(dynamic error)? test}) {
    return DelegatingResponseStream.split(
        responseDelegate, super.handleError(onError, test: test));
  }

  @override
  Future<void> cancel() async {
    await responseDelegate.cancel();
  }

  @override
  Future<Map<String, String>> get headers => responseDelegate.headers;

  @override
  Future<Map<String, String>> get trailers => responseDelegate.trailers;
}

(Should be OK to add any other Stream methods as needed.)

aran avatar Oct 13 '22 16:10 aran

Workaround & example code (not thoroughly tested):

import 'dart:async';

import 'package:async/async.dart';

import 'package:grpc/grpc_or_grpcweb.dart';
import 'package:grpc/service_api.dart';

class LogoutInterceptor implements ClientInterceptor {
  final void Function() onLoggedOutDetected;

  LogoutInterceptor({required this.onLoggedOutDetected});

  @override
  ResponseStream<R> interceptStreaming<Q, R>(
      ClientMethod<Q, R> method,
      Stream<Q> requests,
      CallOptions options,
      ClientStreamingInvoker<Q, R> invoker) {
    return DelegatingResponseStream<R>(invoker(method, requests, options))
        .handleError((Object error) {
      onLoggedOutDetected();
    }, test: (e) => (e is GrpcError) && e.code == StatusCode.unauthenticated);
  }

  @override
  ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
      CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
    return DelegatingResponseFuture<R>(invoker(method, request, options))
        .catchError((Object error) {
      onLoggedOutDetected();
    }, test: (e) => e is GrpcError && e.code == StatusCode.unauthenticated);
  }
}

class DelegatingResponseFuture<R> extends DelegatingFuture<R>
    implements ResponseFuture<R> {
  Response responseDelegate;

  DelegatingResponseFuture.split(
      this.responseDelegate, Future<R> futureDelegate)
      : super(futureDelegate);

  DelegatingResponseFuture(ResponseFuture<R> delegate)
      : this.split(delegate, delegate);

  @override
  ResponseStream<R> asStream() =>
      DelegatingResponseStream.split(responseDelegate, super.asStream());

  @override
  ResponseFuture<R> catchError(Function onError,
          {bool Function(Object error)? test}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.catchError(onError, test: test));

  @override
  ResponseFuture<S> then<S>(FutureOr<S> Function(R) onValue,
          {Function? onError}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.then(onValue, onError: onError));

  @override
  ResponseFuture<R> whenComplete(FutureOr Function() action) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.whenComplete(action));

  @override
  ResponseFuture<R> timeout(Duration timeLimit,
          {FutureOr<R> Function()? onTimeout}) =>
      DelegatingResponseFuture.split(
          responseDelegate, super.timeout(timeLimit, onTimeout: onTimeout));

  @override
  Future<void> cancel() {
    return responseDelegate.cancel();
  }

  @override
  Future<Map<String, String>> get headers => responseDelegate.headers;

  @override
  Future<Map<String, String>> get trailers => responseDelegate.trailers;
}

class DelegatingResponseStream<R> extends DelegatingStream<R>
    implements ResponseStream<R> {
  Response responseDelegate;

  DelegatingResponseStream.split(
      this.responseDelegate, Stream<R> streamDelegate)
      : super(streamDelegate);

  DelegatingResponseStream(ResponseStream<R> delegate)
      : this.split(delegate, delegate);

  @override
  ResponseFuture<R> get single =>
      DelegatingResponseFuture.split(responseDelegate, super.single);

  @override
  ResponseStream<R> handleError(Function onError,
      {bool Function(dynamic error)? test}) {
    return DelegatingResponseStream.split(
        responseDelegate, super.handleError(onError, test: test));
  }

  @override
  Future<void> cancel() async {
    await responseDelegate.cancel();
  }

  @override
  Future<Map<String, String>> get headers => responseDelegate.headers;

  @override
  Future<Map<String, String>> get trailers => responseDelegate.trailers;
}

(Should be OK to add any other Stream methods as needed.)

in my initial tests it works, but in Streams, if the token is expired the listeners are cancelled, the refreshToken request is called normally, but the listeners don't restart, in Futures it works fine ... .

tihrasguinho avatar Apr 04 '23 23:04 tihrasguinho

Summarizing this issue and related issues a bit.

  1. There are a couple use cases discussed. Notably: Amending calls (e.g. adding options), amending responses (no concrete example above), adding additional behaviors on certain calls and responses, including error responses (e.g. informing central application code on any authentication error to allow for recovery from a session expiry).
  2. There are two potential API changes discussed: Modifying ResponseFuture classes for smoother composability, or a bigger change to update the API to split request and response handling.
  3. There are a couple workarounds for different use cases: https://github.com/grpc/grpc-dart/issues/413#issuecomment-742405324, https://github.com/grpc/grpc-dart/pull/489#issuecomment-1117204933, and https://github.com/grpc/grpc-dart/issues/413#issuecomment-1277849117, with the latter currently broken in the latest release (https://github.com/grpc/grpc-dart/commit/d9553ca73f66116f7ad14fff5d0e4814253311a4#r118779797)
  4. Chaining creates API issues. (https://github.com/grpc/grpc-dart/pull/419#issuecomment-854542925)
  5. There's an uncommitted fix: https://github.com/grpc/grpc-dart/pull/548

@mraleph @kevmoo - any ideas on anything on the community side or otherwise that could unblock this issue overall?

aran avatar Jun 19 '23 17:06 aran