nats.go
nats.go copied to clipboard
Add a context to each incoming Msg.
Hello! This proposal is similar in a way to my other PR #515 which adds Publish middleware. This PR allows for users to implement useful Subscriber middleware.
Problem: Say I want to register a middleware for a Subscriber. With the nats.go package today, I can create one and easily use it to wrap all of my MsgHandler
s as such:
func wrapSubscribe(mh nats.MsgHandler) nats.MsgHandler {
return func(msg *nats.Msg) {
log.Printf("msg received: subject=%s bytes=%d", msg.Subject, len(msg.Data))
mh(msg)
}
}
nc.Subscribe(subj, wrapSubscribe(func(m *nats.Msg) {
fmt.Println("Received async message:", string(m.Data))
}))
The problem though is that there is no way to feed context from the wrapper into the underlying MsgHandler
. This is particularly important for our New Relic users because we want to allow them to be able to access a transaction that was started in the wrapper from inside their handler.
Proposed solution: The easiest way to solve this is by adding a context.Context
field to the Msg
.
Other ideas: I explored the alternative of changing the signature of the MsgHandler
(creating a new MsgHandlerWithContext
) that accepts the context.Context
https://github.com/purple4reina/nats.go/commit/15a583d80ead94f02a98c8b38fb350b1f71ceea9 . I didn't like this as much because it adds yet more methods to the nats.Conn
. Plus, there is no way in this system to propagate context for channel and sync subscribers.
Drawbacks: The one drawback to consider is that with each incoming Msg
there will now be one additional allocation.
Depending on your thoughts, I have a similar PR I can open to stan as well. Thanks again for being so responsive, I look forward to hearing back from you!
Coverage remained the same at 92.097% when pulling 523b8b3b8ea47346ac8c516e3b47be1e7dd279ac on purple4reina:msg-ctx into 6ffcfbcbf08bc73e2158a0685edec6b32c941d38 on nats-io:master.
I guess I understand the want for using context.Context, but if it is really be able to set/get user values to/from the wrapper, would having a Msg.UserValue interface{}
(I am bad at naming) instead? At least there is no mem alloc in processMsg() and user can set something if wanted. But I may be missing something.
@kozlovic agreed, it's not good that I've proposed an additional allocation. I'm considering alternatives. I'm liking your idea of a Msg.UserValue interface{}
. What's nice about go's contexts is that they can allow for any number of key/value pairs. Maybe the solution therefore is to add the Msg.Context context.Context
field but just not initialize it. Then, if a user wants this value to be set, they could do it themselves in their wrappers.
Here's what I'm thinking for a Subscriber wrapper:
func subscribeWrapper(fn nats.MsgHandler) nats.MsgHandler {
return func(msg *nats.Msg) {
if msg.Context == nil {
ctx := context.Background()
msg.Context = context.WithValue(ctx, "mykey", "myvalue")
}
fn(msg)
}
}
Thoughts?
Apologies for not getting back to this sooner. We are looking at this client for some work on generic headers.
I want to introduce a clean way to do middleware, the encoded connection was a bandaid for early on but is not sufficient.