nats.go icon indicating copy to clipboard operation
nats.go copied to clipboard

Add a context to each incoming Msg.

Open purple4reina opened this issue 5 years ago • 4 comments

Hello! This proposal is similar in a way to my other PR #515 which adds Publish middleware. This PR allows for users to implement useful Subscriber middleware.

Problem: Say I want to register a middleware for a Subscriber. With the nats.go package today, I can create one and easily use it to wrap all of my MsgHandlers as such:

func wrapSubscribe(mh nats.MsgHandler) nats.MsgHandler {
    return func(msg *nats.Msg) {
        log.Printf("msg received: subject=%s bytes=%d", msg.Subject, len(msg.Data))
        mh(msg)
    }   
}

nc.Subscribe(subj, wrapSubscribe(func(m *nats.Msg) {
    fmt.Println("Received async message:", string(m.Data))
}))

The problem though is that there is no way to feed context from the wrapper into the underlying MsgHandler. This is particularly important for our New Relic users because we want to allow them to be able to access a transaction that was started in the wrapper from inside their handler.

Proposed solution: The easiest way to solve this is by adding a context.Context field to the Msg.

Other ideas: I explored the alternative of changing the signature of the MsgHandler (creating a new MsgHandlerWithContext) that accepts the context.Context https://github.com/purple4reina/nats.go/commit/15a583d80ead94f02a98c8b38fb350b1f71ceea9 . I didn't like this as much because it adds yet more methods to the nats.Conn. Plus, there is no way in this system to propagate context for channel and sync subscribers.

Drawbacks: The one drawback to consider is that with each incoming Msg there will now be one additional allocation.

Depending on your thoughts, I have a similar PR I can open to stan as well. Thanks again for being so responsive, I look forward to hearing back from you!

purple4reina avatar Sep 04 '19 22:09 purple4reina

Coverage Status

Coverage remained the same at 92.097% when pulling 523b8b3b8ea47346ac8c516e3b47be1e7dd279ac on purple4reina:msg-ctx into 6ffcfbcbf08bc73e2158a0685edec6b32c941d38 on nats-io:master.

coveralls avatar Sep 04 '19 23:09 coveralls

I guess I understand the want for using context.Context, but if it is really be able to set/get user values to/from the wrapper, would having a Msg.UserValue interface{} (I am bad at naming) instead? At least there is no mem alloc in processMsg() and user can set something if wanted. But I may be missing something.

kozlovic avatar Sep 04 '19 23:09 kozlovic

@kozlovic agreed, it's not good that I've proposed an additional allocation. I'm considering alternatives. I'm liking your idea of a Msg.UserValue interface{}. What's nice about go's contexts is that they can allow for any number of key/value pairs. Maybe the solution therefore is to add the Msg.Context context.Context field but just not initialize it. Then, if a user wants this value to be set, they could do it themselves in their wrappers.

Here's what I'm thinking for a Subscriber wrapper:

func subscribeWrapper(fn nats.MsgHandler) nats.MsgHandler {
    return func(msg *nats.Msg) {
        if msg.Context == nil {
            ctx := context.Background()
            msg.Context = context.WithValue(ctx, "mykey", "myvalue")
        }
        fn(msg)
    }
}

Thoughts?

purple4reina avatar Sep 05 '19 23:09 purple4reina

Apologies for not getting back to this sooner. We are looking at this client for some work on generic headers.

I want to introduce a clean way to do middleware, the encoded connection was a bandaid for early on but is not sufficient.

derekcollison avatar Apr 27 '20 06:04 derekcollison