connect-go
connect-go copied to clipboard
How to implement a "full-stream" interceptor?
Is your feature request related to a problem? Please describe.
I'm struggling to understand how to implement a logging/tracing interceptor for the full Client/Server side streams.
Something similar to how it was done with grpc-go, example for Elasticsearch's APM tracing: https://github.com/elastic/apm-agent-go/blob/main/module/apmgrpc/server.go#L111
If I embed the tracing-information using WrapStreamContext (which is also missing something like Spec, to be able to identify which stream is called), how would I go about closing the transaction on end?
Describe the solution you'd like I'd like a solution that allows for tracking the duration and result of streams, similar to how it's possible for unary calls.
Thanks for the feedback, @bendiktv2!
With the current API, you'd close the transaction in the Close method on your Sender or Receiver implementation. Unless you're also attaching a tracer or some similar struct to the context, I don't think you'd need any logic in WrapStreamContext at all.
However, this API isn't as easy-to-use as it should be. We'll rethink the stream interceptor APIs a bit as a result of #296, which will hopefully clear up some of the confusion. We're also in the process of writing OpenCensus and OpenTracing interceptors, which should be good examples of how to do this sort of thing.
Great job on the refactoring of the server-side stream-interceptors. I still feel the client-side stream-interceptors' API is very clunky to work with.
For instance, to create a APM tracing interceptor for the client-side, I had to do the following:
The Unary interceptor is really simple:
func (a *apmClientInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
if !a.tracer.Recording() {
return next(ctx, req)
}
span, ctx := startSpan(ctx, req.Spec(), req.Header())
if span != nil {
defer span.End()
}
resp, err := next(ctx, req)
if span != nil {
setSpanResult(span, err, req.Spec(), req.Header())
}
return resp, err
})
}
But to do the same for the client-streaming version, was not pleasant.
This is what I have come up with:
func (a *apmClientInterceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
return func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn {
if !a.tracer.Recording() {
return next(ctx, spec)
}
span, ctx, traceParent, traceState := startStreamingSpan(ctx, spec) // Start new span, add it to ctx, and get string-versions of the two headers we need
return &apmSpanningClientConn{
StreamingClientConn: next(ctx, spec),
requestCtx: ctx,
spec: spec,
span: span,
traceParent: traceParent,
traceState: traceState,
setHeadersOnce: sync.Once{},
endSpanOnce: sync.Once{},
}
}
}
And then I had to "override" almost all ClientConn-functions:
func (cc *apmSpanningClientConn) RequestHeader() http.Header {
headers := cc.StreamingClientConn.RequestHeader()
cc.setHeadersOnce.Do(func() {
if cc.traceParent != "" {
headers.Set(w3cTraceparentHeader, cc.traceParent)
if cc.traceState != "" {
headers.Set(tracestateHeader, cc.traceState)
}
}
})
return headers
}
func (cc *apmSpanningClientConn) Send(r any) error {
err := cc.StreamingClientConn.Send(r)
cc.endSpanIfErr(err, false)
return err
}
func (cc *apmSpanningClientConn) Receive(r any) error {
err := cc.StreamingClientConn.Receive(r)
cc.endSpanIfErr(err, false)
return err
}
func (cc *apmSpanningClientConn) CloseRequest() error {
err := cc.StreamingClientConn.CloseRequest()
cc.endSpanIfErr(err, false)
return err
}
func (cc *apmSpanningClientConn) CloseResponse() error {
err := cc.StreamingClientConn.CloseResponse()
cc.endSpanIfErr(err, true)
return err
}
func (cc *apmSpanningClientConn) endSpanIfErr(err error, force bool) {
if cc.span == nil {
return
}
if errors.Is(err, io.EOF) {
// For apm-purposes, io.EOF is not an error.
err = nil
}
if err != nil || force {
cc.endSpanOnce.Do(func() {
setSpanResult(cc.span, err, cc.spec, cc.RequestHeader())
cc.span.End()
})
}
}
And I might still have some race-conditions, such as if the Response-side of the stream is closed (without error), before the Request-side of the stream is closed/sent (with error).
So it is not just very verbose, it is also very error-prone. I hope you will consider a second pass of the client-side interceptor API, or add some sort of convenience-wrapper over the whole StreamingClientFunc.
Everything about clients is intrinsically more complex than handlers, because they're always working with futures: no data has been received or sent until the user starts calling methods. I'd love to make this easier, but I'm not sure how to do so in a way that works for a variety of use cases. Do you have any suggestions?
It looks to me like you're using Elastic's APM client. By way of comparison, the code you've written seems quite a bit simpler than Elastic's interceptor for streaming grpc-go clients: you just need the wrapper struct, and don't need to be managing goroutines yourself.
Fun fact: I had not actually looked at Elastic's client-interceptor myself, as it was provided by Elastic, and simply "worked", and I had mostly been looking at the server-interceptor.
For my case (add tracing to a stream), or for a case like logging the duration and result of a call, I guess it boils down to:
- Do any pre-request work (like starting a span, or storing the start-time)
- Override a call to RequestHeaders() to be able to add new headers to the request.
- Store the first non-nill error from any of the error-producing functions (Send, Receive, CloseRequest, CloseResponse)
- After both CloseRequest and CloseResponse has been called, do the cleanup-work (inspect the stored error, and close the span, or log the result)
I guess some sort of helper-interceptor could be created in the connect-lib to allow for this exact pattern, as I guess this is a pattern that would repeat in a few different types of interceptors.
Appreciate the feedback, @bendiktv2! I can see the pattern you're describing, but the observability clients I've seen vary widely - I can imagine many patterns like this. Rather than putting a variety of interceptor-constructing helpers into the connect-go package, I'd prefer to rely on third-party packages. If one or two patterns are overwhelmingly common, we can pull them into connect-go later on.
I do see utility in the wrapper you're suggesting, though - perhaps you'd like to publish it under your own Github username for now?
As far as I know, the concrete issue here is resolved. We're going to use #344 as our example streaming interceptor - @bendiktv2, please subscribe to that issue if you'd like updates!