nats icon indicating copy to clipboard operation
nats copied to clipboard

Add support for multiple replies

Open jgaskins opened this issue 4 months ago • 2 comments

The folks at Synadia have written about sending one request and receiving many replies. There are several use cases for this:

"Request Many" archetypes

Max Replies

This is when you're willing to accept up to N replies to your message or the number of replies is known ahead of time.

body = {status: "open", limit: 10}.to_json
replies = nats.request_many("orders.list", body, max_replies: 10)

Sentinel Message

Pass a block that inspects replies to determine whether they're the "sentinel" message that marks the end of the reply series. The sentinel message is not returned. This is the pattern used in things like NATS Object Store to denote the end of the replies.

replies = nats.request_many("orders.list", body, timeout: 5.seconds) do |reply|
  # The reply is empty, so we've reached the end of the list.
  reply.data.empty?
end

If the timeout passes (total timeout, that is — not per reply), all replies that have been received will be returned immediately. We don't want to wait indefinitely. If you need to determine whether you've reached the end, you should set a flag:

finished = false
replies = nats.request_many("orders.list", body, timeout: 5.seconds) do |reply|
  # The reply is empty, so we've reached the end of the list.
  finished = reply.data.empty?
end

if finished
  # We received the full list
else
  # We timed out, but maybe we can do something
end

Stall Timeout

This is useful when you have no idea how many messages you might receive, but you're willing to wait for messages to come in until N seconds have passed since the last message.

This pattern is useful for things like pinging all of the instances for your NATS microservice. You'll get a pong response from all endpoint instances, but they could come in pretty staggered, especially if they're distributed around the world.

replies = nats.request_many("orders.list", body, stall_timeout: 5.seconds)

jgaskins avatar Jul 11 '25 03:07 jgaskins

Thinking about this some more and wondering if it might be better to return an Iterator(NATS::Message) rather than an array. Two of these three concepts have no upper bound on how many messages they return and I don't think we should allow this to be completely unbounded.

The only downside is that, since this is for core NATS requests (not JetStream consumers), if we stream the messages in, the client must consume them quickly. If it doesn't and the subscription buffer fills up (64k messages deep by default), the NATS server will disconnect us. The NATS server optimizes for itself and is quick to terminate slow clients — I think the timeout is 2 seconds, which is why we buffer incoming messages.

So there isn't a clear winner here. Both implementations have tradeoffs. The array consumes more RAM and the iterator could get us disconnected if we aren't fast. Maybe we should support both? But I'm not sure what the API would look like for that.

jgaskins avatar Oct 30 '25 00:10 jgaskins

An additional "request many" archetype has been rattling around in my mind, as well. This PR so far has been about sending 1 request and getting many replies. There are also uses cases for receive N responses for N responses. For example, let's say you're consuming a stream and each message contains a reference to a value in a KV bucket. It could be really nice to pipeline all of those requests and get their responses.

I don't know for certain what this API would look like, but off the top of my head, in my Redis shard, you can do redis.pipeline { |pipe| ... }, and the return value is an array containing all of the results. We could do something similar here.

nats.kv.get_many do |pipeline|
  keys.each do |key|
    pipeline.get bucket_name, key
  end
end

While that example is for NATS::KV, I have a feeling this will need support in NATS::Client. It can match up messages to their original requests, but I don't think it exposes that knowledge anywhere.

jgaskins avatar Oct 30 '25 00:10 jgaskins