nats
nats copied to clipboard
Add support for multiple replies
The folks at Synadia have written about sending one request and receiving many replies. There are several use cases for this:
- Chunked responses, such as fetching object contents from the NATS Object Store
- Fanning out a request across many recipients. This concept is used by the NATS Services framework to ping, get stats, or get info about services and endpoints.
"Request Many" archetypes
Max Replies
This is when you're willing to accept up to N replies to your message or the number of replies is known ahead of time.
body = {status: "open", limit: 10}.to_json
replies = nats.request_many("orders.list", body, max_replies: 10)
Sentinel Message
Pass a block that inspects replies to determine whether they're the "sentinel" message that marks the end of the reply series. The sentinel message is not returned. This is the pattern used in things like NATS Object Store to denote the end of the replies.
replies = nats.request_many("orders.list", body, timeout: 5.seconds) do |reply|
# The reply is empty, so we've reached the end of the list.
reply.data.empty?
end
If the timeout passes (total timeout, that is — not per reply), all replies that have been received will be returned immediately. We don't want to wait indefinitely. If you need to determine whether you've reached the end, you should set a flag:
finished = false
replies = nats.request_many("orders.list", body, timeout: 5.seconds) do |reply|
# The reply is empty, so we've reached the end of the list.
finished = reply.data.empty?
end
if finished
# We received the full list
else
# We timed out, but maybe we can do something
end
Stall Timeout
This is useful when you have no idea how many messages you might receive, but you're willing to wait for messages to come in until N seconds have passed since the last message.
This pattern is useful for things like pinging all of the instances for your NATS microservice. You'll get a pong response from all endpoint instances, but they could come in pretty staggered, especially if they're distributed around the world.
replies = nats.request_many("orders.list", body, stall_timeout: 5.seconds)
Thinking about this some more and wondering if it might be better to return an Iterator(NATS::Message) rather than an array. Two of these three concepts have no upper bound on how many messages they return and I don't think we should allow this to be completely unbounded.
The only downside is that, since this is for core NATS requests (not JetStream consumers), if we stream the messages in, the client must consume them quickly. If it doesn't and the subscription buffer fills up (64k messages deep by default), the NATS server will disconnect us. The NATS server optimizes for itself and is quick to terminate slow clients — I think the timeout is 2 seconds, which is why we buffer incoming messages.
So there isn't a clear winner here. Both implementations have tradeoffs. The array consumes more RAM and the iterator could get us disconnected if we aren't fast. Maybe we should support both? But I'm not sure what the API would look like for that.
An additional "request many" archetype has been rattling around in my mind, as well. This PR so far has been about sending 1 request and getting many replies. There are also uses cases for receive N responses for N responses. For example, let's say you're consuming a stream and each message contains a reference to a value in a KV bucket. It could be really nice to pipeline all of those requests and get their responses.
I don't know for certain what this API would look like, but off the top of my head, in my Redis shard, you can do redis.pipeline { |pipe| ... }, and the return value is an array containing all of the results. We could do something similar here.
nats.kv.get_many do |pipeline|
keys.each do |key|
pipeline.get bucket_name, key
end
end
While that example is for NATS::KV, I have a feeling this will need support in NATS::Client. It can match up messages to their original requests, but I don't think it exposes that knowledge anywhere.