nats-server
nats-server copied to clipboard
Request more messages in NAK and TERM
When using JetStream in pull mode, there's no way to send an AckNak and request a new item. Same thing happens with AckTerm. It'd be nice to have a way to request the next one, same as AckNext but with nak and term
We have processes consuming queues and when the handler finds an error it'll nak the message and we need the message to be replayed asap. Some of those errors are not-replayable, so the handler will send a term instead. In both cases we want the consumer to fetch the next one.
Thanks.
I was fairly certain we supported this but I can't find reference to it now.
https://github.com/nats-io/nats-server/blob/4575d3620d71cca8fc3aabab9f269937cb919778/server/consumer.go#L1015-L1018
Calls processNextMsgReq only for +NXT acks, so if we are to support this we should do the same in the others.
There's some limitations I can see, maybe more:
- You'd have to send the next message request
JSApiConsumerGetNextRequestin the acks - You probably then cant have an ack to your ack
We'd add this to NAK and Term leaving the rest as is?
@derekcollison what do you think about supporting this? Any concerns, I agree with the use case definitely seems like something we should support.
I'm not aware of all usecases, but it seems that if AckAck and AckNext are two different things, there should be also AckNak and AckNakNext, in case one wants to just nak but not get next.
Just thinking out loud, would it make sense to add next as an option? something like -NAK {"next": 1} and even +ACK {"next": 1}
The latter is what I am saying we should do yes, support passing the get next request as a json string after.
Problem with doing this on NAK is that you'll then just forever get the same message in a loop - NAK doesnt put the message you NAK'd like in the back of the list or ensure it goes to another consumer.
That's fine, isn't it? The message will just get redelivered. Adding a backoff functionality would be great, but if that functionality is missing, as a client of jetstream I shouldn't assume the message gets sent to the end of the queue.
Well, conceptually what you will see is:
q <- get msg
while true
msg = read q
if not_ready
Nak msg, requesting next 1
else
process msg
ackNext msg
In this loop, when you Nak the msg asking for the next one - the same message will be the next 1. You'll forever cycle on that same message you just NAKd
What we probably need is a NAK with a delay attacked, ie. You NAK it now and tell us not to deliver it again for 60 seconds, we'd then give you the next available messages - only 1 minute later do you get a NAKd message else you end up in a very fast loop that just keeps getting the same message
I think it should be supported. I believe NAK and TERM should also be able to take optional payloads that ask for delays in redelivery or movement to a different stream maybe for DLQ style semantics.
In our usecase we NAK the message when there's an error during the handler. We try again the message a few times and after some attempts we just send it to the DLQ using advisories. The handler can also say the message is not retryable, in which case we send a TERM and it goes to the same DLQ.
so, using that code as a baseline, it'd look more like this:
q <- get msg
while true
msg = read q
err = process msg
if err {
NAK or TERM
} else {
ACK
}
I was also wonder how to do retries and DLQ. As far as I can tell, no message metadata is given when you fetch a message, so how can you tell if it's getting close to max_delivery?
NAK with delay payload as well as message metadata on fetch (how many times the message has been delivered and what the max_delivery is) will let us write stream consumers that retry messages with exponential backoff and then put them on DLQ when they've been delivered too many times.
But it would way nicer if Jetstream handled DLQ for us. Then we'd just need NAK with delay payload.
The reply subject that allows one to ACK a message contains all the metadata you would need, including how many pending past this message from a stream perspective.
Does Term put the message into a configured DLQ? It may then exist a reason to be able to amend some information to the Term so that the DLQ can be understood
Messages delivered by a consumer has a bunch of meta data - stream name, consumer name, sequences, how far from the end and red livery counts - all of this only make sense in tbe consumer setting so not when using the msg get api to fetch directly from a stream.
Term messages do raise advisories just like max delivered ones - but in their own subject - so can be used for the same DLQ like purpose. It might be interesting for sure to attach some description to the TERM ack as some context for operators
Max delivery reached raises an advisory to $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.STREAM.CONSUMER
Ack Term raises one to $JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED.STREAM.CONSUMER
Yes, it was the operator context(like why we sent a term on it). For scenario max retry it’s more obvious
@derekcollison seems reasonable to me - to add an optional reason argument to AckTerm that then will show in the term advisory?
I am ok with that.
+1 for NAK with delay payload. A core feature of work/ task queues, exponentially delaying message retries.
It is not clear from the description what AckTerm does in a WorkingQueue
A) does it delete/send it to the DLQ for all consumers
B) stop redelivering it to the current customer and redeliver it to other consumers
It would be a good extension for AckTerm to explicitly define option A or B
In a WQ only one consumer can get a message. There are no overlaps on consumers allowed.
So these questions don’t really apply to WQ streams
@ripienaar what overlap? If a consumer can't process the message then it will be AckTerm,
but maybe some other consumer (other application or version) can process it successfully.
Actually the message may end up stuck in the stream until the consumer is deleted and remade. But I think we fixed that a while ago not sure. You should try.
Term removes the message from the WQ.
By overlap I mean 2 consumers who operate on the ssme subject. That's not supported.
AckTerm will do a logical internal Ack and a Term advisory. If the stream is interest or workqueue will remove the message.
You may want Nak behavior if you think someone else can handle.
After reading this post, I think currently the best option is simply to not call anything and let it timeout. This produces the delivery delay.
The option to repeal an unknown/processable message is most of the time only used in update situation where the order of updates of the publisher and consumer can not be guarantied
There are many scenarios where simply updated publishers can produce new message types/versions that updated consumers can't handle, but the are still running for a short time period.