nats-server icon indicating copy to clipboard operation
nats-server copied to clipboard

Jetstream: Receiving duplicate message retries

Open jbasants opened this issue 2 years ago • 9 comments

What version were you using?

nats-server Version: 2.9.21 Git: [b2e7725]

nats.net 1.0.8

What environment was the server running in?

Laptop with Windows 10 Enterprise x64 21H1 Intel(R) Core(TM) i7-10850H CPU @ 2.70GHz 2.71 GHz 32.0 GB RAM

Is this defect reproducible?

The issue happens when there are re-deliveries of NATS messages of a Jetstream stream, from a durable consumer, because they were not acknlowdge by any client.

We reproduce this by connecting 16 clients to the same Jetstream durable consumer and then sending once 3 messages to the stream topic. The messages are never acknowledged, so they are re-sent by the server after the ACK timeout. After some time, we can see the same message appearing in two different clients at the same time, as you can see in the image (the last messages set). We have repeated messages for the same retry. Each line color represents a different client. The message format is:

[client id]<timestamp> message (info)

nats_cur

We found that the NumDelivered metadata property is always one when there is a repeated message.

We've attached the code we are using. This was found using the .NET client, and initially reported there. The probability of occurrence is reduced by increasing the Fetch command timeout.

We were able to reproduce the effect in three different machines, using the same versions of nats-server and nats.net. This also occurs if the clients are in separate processes, and with default consumer and stream configurations.

We also tested this with acks (with a background producer that keeps sending unique messages), but no duplicates were found.

NatsEventsConsumer.zip

Given the capability you are leveraging, describe your expectation?

We expect that each message is only delivered to one client at a time. This way, we don't have two replicas processing the same message, if retries occur. To be clear, we are aware that if a retry occurs it is not guaranteed to end up in the same replica. The issue here is that the retry is sent in duplicate.

Given the expectation, what is the defect you are observing?

The same message is sent to two different clients, at the same time.

jbasants avatar Aug 31 '23 14:08 jbasants

Is the consumer push based, meaning you are using a delivery subject?

If so that is expected since the delivery subject used will be subscribed by all clients.

We do allow queue groups for push based consumers, or you can use pull based consumer.

derekcollison avatar Aug 31 '23 20:08 derekcollison

No, the consumer is pull based. We are continuously (with low timeout) fetching a batch of 1 from the subscription.

jbasants avatar Sep 01 '23 09:09 jbasants

I reproduced this against servers 2.9.22 and also saw it against 2.8.4 and 2.9.0

0. I added a field to the message object to hold a unique id made every time the object is constructed. This ensured it wasn't internally a duplicate, but in fact came from the server.

1. Create a stream:

{"retention":"limits","storage":"memory","discard":"old","name":"sname","subjects":["sub"],"max_age":0,"num_replicas":1,"duplicate_window":0}

2. Publish 3 messages to sub with data "message1", "message2" and "message3"

3. Create a consumer with ack wait of 15 seconds.

{"stream_name":"sname","config":{"durable_name":"cname","deliver_policy":"all","opt_start_seq":0,"ack_policy":"explicit","ack_wait":15000000000,"replay_policy":"instant","rate_limit_bps":0}}

4. Start 16 threads, each with a subscriber that simply loops. 16, because that's what the user had reported.

4.1 To make a subscriber, I made a pull subscription to the consumer via bind. Plain vanilla. * I made an object for each subscriber to help track and print. * The object used the subscription sid for an id (subId), and also to help see that the message sid matched.

Each subscriber thread did the following:

while (true) {
    A. make pull request: `{"batch":1,"expires":10000000}` // 10 ms
    B. message m = sub.nextMessage(20 millis) // 20ms is the amount of time I wait for a message to appear in the internal queue else return null
    C. if m != null, print message current time, msg.data, subId, msg.sid, msg.meta.streamSeq, msg.meta.Delivered, msg.meta.Pending, msg.meta.raw
}

This is example output.

15:36:07.928 message1, subId=4  msgsid=4  seq=1, delivered=1, pending=2, meta=$JS.ACK.sname.cname.1.1.1.1694806567895705500.2, msg.uid=L7wng4
15:36:07.929 message2, subId=2  msgsid=2  seq=2, delivered=1, pending=1, meta=$JS.ACK.sname.cname.1.2.2.1694806567897789600.1, msg.uid=L7wnj-
15:36:07.929 message3, subId=5  msgsid=5  seq=3, delivered=1, pending=0, meta=$JS.ACK.sname.cname.1.3.3.1694806567897789600.0, msg.uid=L7wnnE
15:36:22.938 message1, subId=3  msgsid=3  seq=1, delivered=2, pending=0, meta=$JS.ACK.sname.cname.2.1.4.1694806567895705500.0, msg.uid=L-WQ4G
15:36:22.940 message3, subId=3  msgsid=3  seq=3, delivered=2, pending=0, meta=$JS.ACK.sname.cname.2.3.6.1694806567897789600.0, msg.uid=L-WQ-S 
15:36:22.938 message2, subId=8  msgsid=8  seq=2, delivered=2, pending=0, meta=$JS.ACK.sname.cname.2.2.5.1694806567897789600.0, msg.uid=L-WQ7M
15:36:37.935 message1, subId=8  msgsid=8  seq=1, delivered=3, pending=0, meta=$JS.ACK.sname.cname.3.1.7.1694806567895705500.0, msg.uid=MBXj3U
15:36:37.935 message2, subId=14 msgsid=14 seq=2, delivered=3, pending=0, meta=$JS.ACK.sname.cname.3.2.8.1694806567897789600.0, msg.uid=MBXj6a
15:36:37.935 message3, subId=6  msgsid=6  seq=3, delivered=3, pending=0, meta=$JS.ACK.sname.cname.3.3.9.1694806567897789600.0, msg.uid=MBXj9g  
15:36:52.938 message2, subId=8  msgsid=8  seq=2, delivered=4, pending=0, meta=$JS.ACK.sname.cname.4.2.11.1694806567897789600.0, msg.uid=MEX9ni
15:36:52.938 message3, subId=8  msgsid=8  seq=3, delivered=4, pending=0, meta=$JS.ACK.sname.cname.4.3.12.1694806567897789600.0, msg.uid=MEX9qo
15:36:52.938 message1, subId=5  msgsid=5  seq=1, delivered=4, pending=0, meta=$JS.ACK.sname.cname.4.1.10.1694806567895705500.0, msg.uid=MEX9kc
15:37:07.946 message1, subId=4  msgsid=4  seq=1, delivered=5, pending=0, meta=$JS.ACK.sname.cname.5.1.13.1694806567895705500.0, msg.uid=MHZlZE
15:37:07.946 message2, subId=12 msgsid=12 seq=2, delivered=5, pending=0, meta=$JS.ACK.sname.cname.5.2.14.1694806567897789600.0, msg.uid=MHZlcK

Here's where it gets interesting. Notice the message 3 is delivered twice, with the second instance the delivered changes to 1, but message1 is not redelived. Next round it's back to normal.

15:37:22.947 message2, subId=8  msgsid=8  seq=2, delivered=6, pending=0, meta=$JS.ACK.sname.cname.6.2.16.1694806567897789600.0, msg.uid=MKaFek
15:37:22.947 message3, subId=7  msgsid=7  seq=3, delivered=6, pending=0, meta=$JS.ACK.sname.cname.6.3.17.1694806567897789600.0, msg.uid=MKaFhq
15:37:22.948 message3, subId=4  msgsid=4  seq=3, delivered=1, pending=0, meta=$JS.ACK.sname.cname.1.3.18.1694806567897789600.0, msg.uid=MKaFkw
15:37:37.965 message2, subId=4  msgsid=4  seq=2, delivered=7, pending=0, meta=$JS.ACK.sname.cname.7.2.20.1694806567897789600.0, msg.uid=MNfTo6
15:37:37.965 message1, subId=3  msgsid=3  seq=1, delivered=7, pending=0, meta=$JS.ACK.sname.cname.7.1.19.1694806567895705500.0, msg.uid=MNfTl0
15:37:37.966 message3, subId=14 msgsid=14 seq=3, delivered=7, pending=0, meta=$JS.ACK.sname.cname.7.3.21.1694806567897789600.0, msg.uid=MNfTsA

scottf avatar Sep 15 '23 19:09 scottf

@scottf is this a server issue that needs to be addressed or a client one? Not clear from above.

derekcollison avatar Sep 19 '23 13:09 derekcollison

This is a server issue. I can reproduce this in both java and .net. I think it has to do with the repeated pull requests with 10ms expiration. I ruled out the clients muxing or duplicating messages and saw that the re-delivery came from the server.

scottf avatar Sep 19 '23 17:09 scottf

Hey all. Any news?

jbasants avatar Oct 27 '23 10:10 jbasants

@jbasants While this is a legitimate server issue, I think it can easily be addressed on your side. The problem is around the 10ms pull expire time. What are you trying to accomplish here? Can you describe your use case some and explain why you are doing this? Are you trying to limit the amount of time you wait? You could try a raw pull with no wait. I'm not sure either of these are the best pattern anyway, you are basically in a wait loop with continuous communication with the server, possibly causing a lot of cpu usage on the client and a lot of traffic / work for the server. Is there any reason you can't use the simplification endless consume? It uses pull under the covers and you can choose to poll for messages with next or have the messages delivered to a handler as they come in from the server. I think either of those options would solve your problem, again without knowing your use case.

scottf avatar Oct 27 '23 11:10 scottf

@scottf We want to manage lots of subscriptions (1000+) on the same app, so we don't want to have a thread for each subscription (by using the async subscription pattern). The solution we were evaluating, is to have a single thread (or at least, fewer threads) in the background querying every subscription for new messages. For that, we wanted a low request timeout so that the thread doesn't get stuck too much time if there are no subscriptions.

I reckon that 10ms may be a bit low, and may put too much traffic on the server, but we can increase that number for sure. What worries us is that, if our system fails for some reason, and we get several message re-deliveries, we could get two replicas processing the same message. Are we sure that with a higher timeout this could not happen? Also, wouldn't a pull with no wait make this worse?

We could use simplification, but since the underlying mechanism is the same, it won't solve our threading problem, right?

Thanks for your help.

jbasants avatar Nov 02 '23 15:11 jbasants

@jbasants You can have many async subscriptions share the same dispatcher (thread). There is a simplification signature that accepts a dispatcher as a parameter.

MessageConsumer consume(ConsumeOptions consumeOptions, Dispatcher dispatcher, MessageHandler handler) throws IOException, JetStreamApiException;

This allows you complete control over the threading. You could also build your own dispatcher mechanism, meaning use a standard simplification consumer which does some basic check and then you dispatch or queue the message to some group of handlers. This seems to be similar to what you considered doing.

Another thing to consider here is making you consumer have a longer ack wait. If something happens, the message won't get redelivered for that period of time giving you a bigger window to handle recovery.

If you must guarantee only once processing, no matter what, you cannot simply rely on NATS. You must build in some sequence tracking in the processing or messages.

Can you explain what you mean by replicas, not sure if you mean NATS stream replication or something else.

scottf avatar Nov 02 '23 22:11 scottf