connect-go
connect-go copied to clipboard
Improve support for observability frameworks
Is your feature request related to a problem? Please describe.
Observability plays a critical role in ensuring the reliability, performance, and scalability of services. Presently, while connect-go supports observability through interceptors and HTTP middleware, both solutions have limitations. Interceptors lack specific details like on-the-wire message size and exhibit inconsistent reporting of start times between unary and stream interceptors. On the other hand, while HTTP middleware can offer more precise details, it necessitates redundant protocol parsing to generate comprehensive metrics (such as deducing protocols, parsing envelopes, and handling error messages). Additionally, it fails to capture client-side marshalling time.
Describe the solution you'd like
The objective is to seamlessly integrate observability frameworks and APIs with connect-go. This integration should be framework-agnostic and facilitate the implementation of monitoring, tracing, and logging. It must maintain low overhead and accurately record statistics.
To achieve this, a proposed solution involves introducing an event emitter option called WithObservability to both client and handler options. This option will accept an Observability function, invoked for each new RPC to create a new Observer. These Observers will receive ObserverEvents emitted as the RPC progresses. They may modify the context and headers to propagate essential information to other observers, crucial for tracing frameworks.
Outlined below are the proposed new APIs for Observability:
// WithObservability provides a function that may return an [Observer] to
// observe events during the execution of an RPC. For example, an Observer may
// be used to emit metrics, trace or log events. The observability function may
// modify the context or header metadata to propagate information required by
// other Observers. A nil Observer disables observability for the current
// request.
func WithObservability(observability func(context.Context, Spec, Peer, http.Header) (context.Context, Observer)) Option
// Observer is a function that observes the execution of a request. An Observer
// is called with events that occur during the execution of the request. The
// Observer may modify the context or header metadata to propagate information
// required by other Observers.
type Observer func(ctx context.Context, event ObserverEvent)
// ObserverEvent is an event that occurs during the execution of an RPC.
type ObserverEvent interface {
isObserverEvent()
}
Events will be emitted to the Observer for important flow conditions:
Retry(optional): emitted on a client request retries, only to client observer.RequestMessage: client sends a message and server receives.ResponseMessage: server sends send a message and client receives.ResponseHeader: server sends headers, client receives response headers.End: before return, includes wire error and trailers.
Events will be emitted as shown:
sequenceDiagram
Client-->>Observer:Observe Client start
activate Client
Client->>Handler: Request headers
loop Connection
Client->>Handler: Request headers
Client-->>Observer: Retry event
end
Handler-->>Observer:Observe Handler start
activate Handler
par Request
loop
Client->>Handler: Request message
Client-->>Observer: RequestMessage event
Handler-->>Observer: RequestMessage event
end
Client->>Handler: Close request
and Response
Handler->>Client: Response headers
Handler-->>Observer: ResponseHeader event
Client-->>Observer: ResponseHeader event
loop
Handler->>Client: Response message
Client-->>Observer: ResponseMessage event
Handler-->>Observer: ResponseMessage event
end
Handler->>Client: Close response
end
Handler-->>Observer: End event
deactivate Handler
Client-->>Observer: End event
deactivate Client
A full set of events needs definition. Events contain read only data emitted directly after the event has occurred. A subset:
// ObserverEventEnd is emitted when the RPC ends.
type ObserverEventEnd struct {
Err *Error // nil if the RPC completed successfully
Trailer http.Header // Trailer metadata
}
func (*ObserverEventEnd) isObserverEvent() {}
// ObserverEventRequestMessage is emitted when a request message is sent or received.
type ObserverEventRequestMessage struct {
Size int // Size of the message on the wire
Codec string // Codec used to encode the message
Compression string // Compression used to encode the message
}
func (*ObserverEventRequestMessage) isObserverEvent() {}
Describe alternatives you've considered
- Modify interceptors to better support observability:
- Difficult to propagate information to the interceptor in a backwards compatible way. Could add information to the context but information like message size on the wire is still unavailable as the Send interceptor is called before marshalling.
- HTTP middleware.
- Investigating this solution was more complex than I'd hoped. Need to handle enveloping and parsing of error payloads for capturing errors.
Additional context See https://github.com/connectrpc/otelconnect-go for the current interceptor solution.
Example
As an example below is a rough sketch showing the potential implementation of gRPC metrics A66 proposal with a stateful observe method using the go otel library:
func GRPCServerObserver(ctx context.Context, spec connect.Spec, peer connect.Peer, header http.Header) (context.Context, connect.Observer) {
startAt := time.Now()
attrSet := attribute.NewSet(
attribute.String("grpc.method", o.spec.Procedure[1:]),
)
metrics.started.Add(ctx, 1, attrSet)
var (
sentTotal int64
rcvdTotal int64
)
return ctx, func(ctx context.Context, event connect.ObserverEvent) {
switch event := event.(type) {
case *connect.ObserverEventResponseMessage:
sentTotal += event.Size
case *connect.ObserverEventReceiveMessage:
rcvdTotal += event.Size
case *connect.ObserverEventEnd:
endAttrSet := attribute.NewSet(
attribute.String("grpc.status", grpcCanonicalStatusString(event.Err.Code)),
)
duration := time.Since(o.startAt)
o.metrics.duration.Record(
ctx, duration.Seconds(),
metric.WithAttributeSet(attrSet),
metric.WithAttributeSet(endAttrSet),
)
o.metrics.rcvdTotal.Record(
ctx, rcvdTotal,
metric.WithAttributeSet(attrSet),
metric.WithAttributeSet(endAttrSet),
)
o.metrics.sentTotal.Record(
ctx, sentTotal,
metric.WithAttributeSet(attrSet),
metric.WithAttributeSet(endAttrSet),
)
}
}
}
I'm broadly in favor of this. It's unfortunate that interceptors can't solve all these use cases, but it's difficult to make them reasonable for low-level, byte-oriented use cases and high-level, struct-oriented use cases. We've spent about a year on otelconnect, and I'm still not thrilled with it. It's missing some important data and it's unreasonably complex - the runtime should do more heavy lifting to make integration with a logging/tracing/metrics backend easier.
Introducing a parallel concept specifically for observability (like gRPC's stats package) is probably our best bet.
Regarding one Observer func callback vs some Observer struct with callbacks:
I think one big benefit of having something like:
type Observer struct {
ObserveEventResponseMessage(context.Context, connect.Event)
...
}
The observer could opt in/out of specific events they'd like to capture, and depending on implementation, if computing one of the Events is relatively expensive, it can be avoided entirely if there's no callback for a specific event.
Whereas with one generic Observer callback, the consumer is responsible for filtering out events it doesn't want or ignoring, but the collection already happened.
So fundamental difference to me is the single Observer is a bigger opt-in/opt-out for all observability, whereas a struct with methods gives opt-in/opt-out on a more granular per-event level. Inside connect, you'd have the ability to avoid extraneous time.Now() calls for measuring timing, or allocating an Event struct, etc, if the observer doesn't want that event.
Meeting notes
Met to discuss the above proposal today. Attendees: @mattrobenolt, @akshayjshah, @jhump, @emcfarlane
Summary
All in agreement on the need to streamline integration of observability in connect-go. The main discussion point was the addition to the API surface. Two solutions were discussed to mitigate these concerns:
-
Third party package utilizing interceptors and middleware:
Create a third party package that utilizes interceptors and http middleware in combination to generate the Observer framework. This would use the context to co-ordinate and capture event data. Otelconnect-go would then be reimplemented using this Observer framework. This has the benefit of no impact to the connect package's API surface. However it may limit the metrics/tracing we are able to gather.
-
Utility sub-packages within connect-go:
Refactor internals of connect-go to allow for sub-packages to exist within the module. Useful low-level packages for fiddling with the wire format of various protocols could be split out. These would additional help other projects, like vanguard-go, avoid reimplementing low-level protocol details. Observability could then be created as a sub-package reducing the impact to the main connect package’s API. Anything exposed that is required to be moved can be aliased in order to avoid circular imports.
Next Steps
- Investigate a POC implementation of the third-party package approach to evaluate it’s impact on metrics and tracing abilities.
- Start exploring refactoring of packages into internal sub packages. This would help evaluate the API surface we want to expose and the use cases for it. As a side benefit this could help cleanup logic within the connect package.
Will update this issue as it progresses.
Started the proof of concept for the third party package approach (WIP branch here).
Changes needed
Signature for the observability is updated to handle connect client interceptors. They don't have access to the request header or peer object on initialization. To include the span in the client context we must create the observer before the connection is created. Therefore, a new event EventRequest is defined which is sent on header send, allowing tracing frameworks to set the carrier for the trace.
type Observability func(context.Context, connect.Spec) (context.Context, Observer)
// EventRequest is emitted when an RPC request is started. The header
// may be modified to propagate information required by other Observers.
type EventRequest struct {
Header http.Header
Peer connect.Peer
}
Observer is updated to use @mattrobenolt suggestion to be a set of functions:
type Observer struct {
Request func(EventRequest)
Response func(EventResponse)
End func(EventEnd)
}
Events on message send and receive are dropped. Difficult to capture message size/encoding/compression without consuming the bytes in a protocol aware way. So instead the count of messages sent and received along with the total size in bytes is recorded on the end message:
type EventEnd struct {
Err error // nil if the RPC completed successfully
Trailer http.Header // Trailer metadata
Duration time.Duration
SentTotalBytes int64
ReceivedTotalBytes int64
SentMessageCount int64
ReceivedMessageCount int64
}
Start time is difficult to measure correctly for the client (capture the marshalling time). For unary calls the interceptor is called after marshalling and before writing to the round tripper. For streaming we could record the time on the Send interceptor, but for consistency with unary we record the time on the header send event.