nats.go icon indicating copy to clipboard operation
nats.go copied to clipboard

add a Request() that has a option to wait for multiple results rather than just 1

Open matthiashanel opened this issue 5 years ago • 9 comments

@derekcollison should we look at add a Request() that has a option to wait for multiple results rather than just 1 to nats.go instead of adding more here? Would be very useful.

Originally posted by @ripienaar in https://github.com/nats-io/jetstream/pull/279#issuecomment-671290648

matthiashanel avatar Aug 12 '20 20:08 matthiashanel

More thought needs to go into it for sure but I can see this here. I sense its a bit different that what we have in the server with singleton, stream or chunked. This feels more like a barrier pattern.

derekcollison avatar Aug 12 '20 23:08 derekcollison

@ripienaar @derekcollison @wallyqs

I've been needing this again and ended up with smth that looks a lot like the snippet below. Would this be an acceptable api/name/implementation

func (nc *nats.Conn) RequestN(subject string, reqData []byte, timeout time.Duration, limitMsgs int, cb nats.MsgHandler) error {
	ib := nats.NewInbox()
	sub, err := nc.SubscribeSync(ib)
	if err != nil {
		return err
	}
	if err := nc.PublishRequest(subject, ib, reqData); err != nil {
		return err
	}
	responses := 0
	now := time.Now()
	start := now
	end := start.Add(timeout)
	// receive until timeout is exceeded OR limit is hit
	for ; end.After(now) && (limitMsgs == 0 || responses < limitMsgs); now = time.Now() {
		if resp, err := sub.NextMsg(end.Sub(now)); err != nil {
			if err != nats.ErrTimeout || responses == 0 {
				return err
			}
		} else {
			cb(resp)
			continue
		}
		break
	}
	return nil
}

matthiashanel avatar Oct 12 '20 17:10 matthiashanel

We have been playing with variable options to Subscribe() and possibly Request(), expecting more than 1 response and whether it is chunked or streamed might be a good place to put there vs new API.

Jury is definitely out though on this approach vs just adding in lots of new APIs.

derekcollison avatar Oct 12 '20 19:10 derekcollison

Might be good for @ripienaar and @kozlovic to weigh in..

derekcollison avatar Oct 12 '20 19:10 derekcollison

Agreed that going the option way makes it more extensible and avoid explosion of APIs.

kozlovic avatar Oct 12 '20 19:10 kozlovic

Just brought it up because I am repeatedly facing requests with more than one responses.

This is essentially a request, but with more than one response. So can't fit in the request api due to more than one msg Does not quite fit in the subscribe api either, want to sit in place just like request. Having to repeatedly handle short lived subscriptions to receive up to n msgs in a certain time is basically what is to be avoided by this api.

matthiashanel avatar Oct 12 '20 19:10 matthiashanel

Right.. sorry, missed the fact that it has to be a different API for that one since the current Request() returns a message. We can't have return it nil but no error just because of some new options that would be passed, although if that was the case (passing new options), the user would know that Request() is not going to return a message (since he/she would have intentionally added the option that triggers a "request N" behavior)..

kozlovic avatar Oct 12 '20 19:10 kozlovic

Would this be good for the NKIT APIs vs core NATS?

ColinSullivan1 avatar Oct 12 '20 19:10 ColinSullivan1

just my 5c : I suppose, the suggested code may benefit from a call to AutoUnsubscribe(limitMsgs)

Kazmirchuk avatar Sep 28 '21 21:09 Kazmirchuk