grpc-dart
grpc-dart copied to clipboard
Interceptor for handling response
Can we use ClientInterceptor for handling common use cases or errors from the response?
thank!
The question is rather ambiguous, so I don't know whether to say yes or no. Interceptor is called before you make the call and also can listen for responses because it has access to ResponseFuture/ResponseStream. Whether that's enough for what you are trying to achieve or not - I don't know, it depends on what exactly you are trying to achieve. If you would like a more concrete answer, you would have to ask a more concrete questions.
@len8657 the compilation error you have previously posted indicate that you are trying to use gRPC-Web from native Flutter application. This is not a support use case currently. Use native gRPC instead (make sure you import package:grpc/grpc.dart and not package:grpc/grpc-web.dart)
The question is rather ambiguous, so I don't know whether to say
yesorno. Interceptor is called before you make the call and also can listen for responses because it has access toResponseFuture/ResponseStream. Whether that's enough for what you are trying to achieve or not - I don't know, it depends on what exactly you are trying to achieve. If you would like a more concrete answer, you would have to ask a more concrete questions.@len8657 the compilation error you have previously posted indicate that you are trying to use gRPC-Web from native Flutter application. This is not a support use case currently. Use native gRPC instead (make sure you import
package:grpc/grpc.dartand notpackage:grpc/grpc-web.dart)
Yes!
This answer inspired me,I use $createUnaryCall with protobuffer.
I have another problem,I received sends single request called, received response data , can't received single response called.
abstract class ClientInterceptor {
// Intercept unary call.
// This method is called when client sends single request and receives single response.
ResponseFuture interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
return invoker(method, request, options);
}
my code
class TPClientInterceptor implements ClientInterceptor {
final int _id;
int _unary = 0;
int _streaming = 0;
static final List<InterceptorInvocation> _invocations = new List();
TPClientInterceptor(this._id);
@override
ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
_invocations.add(InterceptorInvocation(_id, ++_unary, _streaming));
print('interceptUnary');
return invoker(method, request, _inject(options));
}
CallOptions _inject(CallOptions options) {
print('_inject');
return options.mergedWith(CallOptions(metadata: {
"x-interceptor": _invocations.map((i) => i.toString()).join(', '),
}));
}
@override
ResponseStream<R> interceptStreaming<Q, R>(
ClientMethod<Q, R> method,
Stream<Q> requests,
CallOptions options,
ClientStreamingInvoker<Q, R> invoker) {
_invocations.add(InterceptorInvocation(_id, _unary, ++_streaming));
print('interceptStreaming');
return invoker(method, requests, _inject(options));
}
static void tearDown() {
_invocations.clear();
}
}
static HousePlanServicesClient getHousePlanServicesClient() {
var channel = RPCHelper.tpClientChannel;
Iterable<ClientInterceptor> interceptors = [
TPClientInterceptor(1),
];
HousePlanServicesClient hc =
HousePlanServicesClient(channel, interceptors: interceptors);
return hc;
}
@len8657 You need to listen to ResponseFuture/ResponseStream which invoker returns to intercept the response. e.g. instead of return invoker(method, request, _inject(options)); you can do
final response = invoker(method, request, _inject(options));
response.then((r) {
print('got response: $r');
});
return response;
Though I see that ResponseFuture breaks composability here because ResponseFuture.then returns a Future and not ResponseFuture, which means you can't write
return invoker(...).then(transformResponse);
I think I would be happy to take a PR that fixes ResponseFuture.then to return ResponseFuture to enable this sort of interception.
@len8657 You need to listen to
ResponseFuture/ResponseStreamwhichinvokerreturns to intercept the response. e.g. instead ofreturn invoker(method, request, _inject(options));you can dofinal response = invoker(method, request, _inject(options)); response.then((r) { print('got response: $r'); }); return response;Though I see that
ResponseFuturebreaks composability here becauseResponseFuture.thenreturns aFutureand notResponseFuture, which means you can't writereturn invoker(...).then(transformResponse);I think I would be happy to take a PR that fixes
ResponseFuture.thento returnResponseFutureto enable this sort of interception.
I should have mentioned this issue when asking the question. Thought there is a different way to do it as ResponseFuture.then returns a Future, that's why I asked if we can use ClientInterceptor to handle the response. Apparently the answer is ~no~ not yet. I can work on the PR.
@len8657 You need to listen to
ResponseFuture/ResponseStreamwhichinvokerreturns to intercept the response. e.g. instead ofreturn invoker(method, request, _inject(options));you can dofinal response = invoker(method, request, _inject(options)); response.then((r) { print('got response: $r'); }); return response;Though I see that
ResponseFuturebreaks composability here becauseResponseFuture.thenreturns aFutureand notResponseFuture, which means you can't writereturn invoker(...).then(transformResponse);I think I would be happy to take a PR that fixes
ResponseFuture.thento returnResponseFutureto enable this sort of interception.
This is working for me
@len8657 You need to listen to
ResponseFuture/ResponseStreamwhichinvokerreturns to intercept the response. e.g. instead ofreturn invoker(method, request, _inject(options));you can dofinal response = invoker(method, request, _inject(options)); response.then((r) { print('got response: $r'); }); return response;Though I see that
ResponseFuturebreaks composability here becauseResponseFuture.thenreturns aFutureand notResponseFuture, which means you can't writereturn invoker(...).then(transformResponse);I think I would be happy to take a PR that fixes
ResponseFuture.thento returnResponseFutureto enable this sort of interception.
@mraleph I just submitted a PR for this at #419. Let me know what you think.
Now that I look at this with fresh eyes I think the instead of trying to allow ResponseFuture chaining we could just split interceptors into interceptors which handle calling transformation and interceptors that handle response transformation, e.g. something along the lines of:
abstract class ClientInterceptor {
// Intercept unary call.
// This method is called when client sends single request and receives single response.
ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
CallOptions options, ClientUnaryInvoker<Q, R> invoker);
Future<R> interceptUnaryResponse<R>(ClientMethod<dynamic, R> method, Future<R> response);
// Intercept streaming call.
// This method is called when client sends either request or response stream.
ResponseStream<R> interceptStreaming<Q, R>(
ClientMethod<Q, R> method,
Stream<Q> requests,
CallOptions options,
ClientStreamingInvoker<Q, R> invoker);
Stream<R> interceptStreamingResponse<R>(ClientMethod<dynamic, R> method, Stream<R> response);
}
Then we don't need to deal with ResponseFuture gnarlines at all.
Would that be sufficient for user purposes?
Something like this might be a solution: https://github.com/grpc/grpc-dart/pull/548
Workaround & example code (not thoroughly tested):
import 'dart:async';
import 'package:async/async.dart';
import 'package:grpc/grpc_or_grpcweb.dart';
import 'package:grpc/service_api.dart';
class LogoutInterceptor implements ClientInterceptor {
final void Function() onLoggedOutDetected;
LogoutInterceptor({required this.onLoggedOutDetected});
@override
ResponseStream<R> interceptStreaming<Q, R>(
ClientMethod<Q, R> method,
Stream<Q> requests,
CallOptions options,
ClientStreamingInvoker<Q, R> invoker) {
return DelegatingResponseStream<R>(invoker(method, requests, options))
.handleError((Object error) {
onLoggedOutDetected();
}, test: (e) => (e is GrpcError) && e.code == StatusCode.unauthenticated);
}
@override
ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request,
CallOptions options, ClientUnaryInvoker<Q, R> invoker) {
return DelegatingResponseFuture<R>(invoker(method, request, options))
.catchError((Object error) {
onLoggedOutDetected();
}, test: (e) => e is GrpcError && e.code == StatusCode.unauthenticated);
}
}
class DelegatingResponseFuture<R> extends DelegatingFuture<R>
implements ResponseFuture<R> {
Response responseDelegate;
DelegatingResponseFuture.split(
this.responseDelegate, Future<R> futureDelegate)
: super(futureDelegate);
DelegatingResponseFuture(ResponseFuture<R> delegate)
: this.split(delegate, delegate);
@override
ResponseStream<R> asStream() =>
DelegatingResponseStream.split(responseDelegate, super.asStream());
@override
ResponseFuture<R> catchError(Function onError,
{bool Function(Object error)? test}) =>
DelegatingResponseFuture.split(
responseDelegate, super.catchError(onError, test: test));
@override
ResponseFuture<S> then<S>(FutureOr<S> Function(R) onValue,
{Function? onError}) =>
DelegatingResponseFuture.split(
responseDelegate, super.then(onValue, onError: onError));
@override
ResponseFuture<R> whenComplete(FutureOr Function() action) =>
DelegatingResponseFuture.split(
responseDelegate, super.whenComplete(action));
@override
ResponseFuture<R> timeout(Duration timeLimit,
{FutureOr<R> Function()? onTimeout}) =>
DelegatingResponseFuture.split(
responseDelegate, super.timeout(timeLimit, onTimeout: onTimeout));
@override
Future<void> cancel() {
return responseDelegate.cancel();
}
@override
Future<Map<String, String>> get headers => responseDelegate.headers;
@override
Future<Map<String, String>> get trailers => responseDelegate.trailers;
}
class DelegatingResponseStream<R> extends DelegatingStream<R>
implements ResponseStream<R> {
Response responseDelegate;
DelegatingResponseStream.split(
this.responseDelegate, Stream<R> streamDelegate)
: super(streamDelegate);
DelegatingResponseStream(ResponseStream<R> delegate)
: this.split(delegate, delegate);
@override
ResponseFuture<R> get single =>
DelegatingResponseFuture.split(responseDelegate, super.single);
@override
ResponseStream<R> handleError(Function onError,
{bool Function(dynamic error)? test}) {
return DelegatingResponseStream.split(
responseDelegate, super.handleError(onError, test: test));
}
@override
Future<void> cancel() async {
await responseDelegate.cancel();
}
@override
Future<Map<String, String>> get headers => responseDelegate.headers;
@override
Future<Map<String, String>> get trailers => responseDelegate.trailers;
}
(Should be OK to add any other Stream methods as needed.)
Workaround & example code (not thoroughly tested):
import 'dart:async'; import 'package:async/async.dart'; import 'package:grpc/grpc_or_grpcweb.dart'; import 'package:grpc/service_api.dart'; class LogoutInterceptor implements ClientInterceptor { final void Function() onLoggedOutDetected; LogoutInterceptor({required this.onLoggedOutDetected}); @override ResponseStream<R> interceptStreaming<Q, R>( ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options, ClientStreamingInvoker<Q, R> invoker) { return DelegatingResponseStream<R>(invoker(method, requests, options)) .handleError((Object error) { onLoggedOutDetected(); }, test: (e) => (e is GrpcError) && e.code == StatusCode.unauthenticated); } @override ResponseFuture<R> interceptUnary<Q, R>(ClientMethod<Q, R> method, Q request, CallOptions options, ClientUnaryInvoker<Q, R> invoker) { return DelegatingResponseFuture<R>(invoker(method, request, options)) .catchError((Object error) { onLoggedOutDetected(); }, test: (e) => e is GrpcError && e.code == StatusCode.unauthenticated); } } class DelegatingResponseFuture<R> extends DelegatingFuture<R> implements ResponseFuture<R> { Response responseDelegate; DelegatingResponseFuture.split( this.responseDelegate, Future<R> futureDelegate) : super(futureDelegate); DelegatingResponseFuture(ResponseFuture<R> delegate) : this.split(delegate, delegate); @override ResponseStream<R> asStream() => DelegatingResponseStream.split(responseDelegate, super.asStream()); @override ResponseFuture<R> catchError(Function onError, {bool Function(Object error)? test}) => DelegatingResponseFuture.split( responseDelegate, super.catchError(onError, test: test)); @override ResponseFuture<S> then<S>(FutureOr<S> Function(R) onValue, {Function? onError}) => DelegatingResponseFuture.split( responseDelegate, super.then(onValue, onError: onError)); @override ResponseFuture<R> whenComplete(FutureOr Function() action) => DelegatingResponseFuture.split( responseDelegate, super.whenComplete(action)); @override ResponseFuture<R> timeout(Duration timeLimit, {FutureOr<R> Function()? onTimeout}) => DelegatingResponseFuture.split( responseDelegate, super.timeout(timeLimit, onTimeout: onTimeout)); @override Future<void> cancel() { return responseDelegate.cancel(); } @override Future<Map<String, String>> get headers => responseDelegate.headers; @override Future<Map<String, String>> get trailers => responseDelegate.trailers; } class DelegatingResponseStream<R> extends DelegatingStream<R> implements ResponseStream<R> { Response responseDelegate; DelegatingResponseStream.split( this.responseDelegate, Stream<R> streamDelegate) : super(streamDelegate); DelegatingResponseStream(ResponseStream<R> delegate) : this.split(delegate, delegate); @override ResponseFuture<R> get single => DelegatingResponseFuture.split(responseDelegate, super.single); @override ResponseStream<R> handleError(Function onError, {bool Function(dynamic error)? test}) { return DelegatingResponseStream.split( responseDelegate, super.handleError(onError, test: test)); } @override Future<void> cancel() async { await responseDelegate.cancel(); } @override Future<Map<String, String>> get headers => responseDelegate.headers; @override Future<Map<String, String>> get trailers => responseDelegate.trailers; }(Should be OK to add any other Stream methods as needed.)
in my initial tests it works, but in Streams, if the token is expired the listeners are cancelled, the refreshToken request is called normally, but the listeners don't restart, in Futures it works fine ... .
Summarizing this issue and related issues a bit.
- There are a couple use cases discussed. Notably: Amending calls (e.g. adding options), amending responses (no concrete example above), adding additional behaviors on certain calls and responses, including error responses (e.g. informing central application code on any authentication error to allow for recovery from a session expiry).
- There are two potential API changes discussed: Modifying ResponseFuture classes for smoother composability, or a bigger change to update the API to split request and response handling.
- There are a couple workarounds for different use cases: https://github.com/grpc/grpc-dart/issues/413#issuecomment-742405324, https://github.com/grpc/grpc-dart/pull/489#issuecomment-1117204933, and https://github.com/grpc/grpc-dart/issues/413#issuecomment-1277849117, with the latter currently broken in the latest release (https://github.com/grpc/grpc-dart/commit/d9553ca73f66116f7ad14fff5d0e4814253311a4#r118779797)
- Chaining creates API issues. (https://github.com/grpc/grpc-dart/pull/419#issuecomment-854542925)
- There's an uncommitted fix: https://github.com/grpc/grpc-dart/pull/548
@mraleph @kevmoo - any ideas on anything on the community side or otherwise that could unblock this issue overall?