nats-server
nats-server copied to clipboard
Support duplicate handling across server restarts
Hi! I have noticed a strange or potentially inconsistent behaviour with the duplicate handling of the NATS server.
When I try to add a duplicate message to a work queue on a running NATS instance, its is not added, with or without purges. This is expected behaviour. However, when I restart the server after purge the message is added again. Expected behaviour would have been that the server does not add the duplicate message, just as it does not add it without server restart.
Versions of nats-server
and affected client libraries used:
Server: 2.8.4 (current nats:2.8 on DockerHub) Cli: 0.0.33
OS/Container environment:
Server launched via docker-compose (see below) Cli installed via homebrew
Steps or code to reproduce the issue:
Start server with this docker-compose:
services:
nats-server:
image: nats:2.8
ports:
- "4222:4222"
command: "--jetstream --store_dir=/data -DV"
volumes:
- "persistence:/data"
volumes:
persistence:
Add stream
nats stream add mystream --storage=file --subjects=subj --replicas=1 --retention=work --discard=old --max-msgs=-1 --max-msgs-per-subject=-1 --max-bytes=-1 --max-age=-1 --max-msg-size=-1 --dupe-window=8h --allow-rollup --no-deny-delete --no-deny-purge
Publish message
nats pub subj "Hello" -H "Nats-Msg-Id:1"
Check number of messages, returns 1 (as expected)
nats stream info mystream | grep "Messages"
Purge stream
nats stream purge mystream
Publish same message again
nats pub subj "Hello" -H "Nats-Msg-Id:1"
Check number of messages, returns 0 (as expected) -> Duplicate detection works as expected.
nats stream info mystream | grep "Messages"
When I restart the server (docker-compose down && docker-compose up) after the purge, publishing the original message again actually adds the message to the queue, even if it is duplicate:
nats pub subj "Hello" -H "Nats-Msg-Id:1"
Check number of messages, returns 1 (NOT as expected) -> Duplicate detection behaves differently between server restarts.
nats stream info mystream | grep "Messages"
Expected result:
Message is not added to the queue, as it is duplicate to a previously added message. Server is instructed to persist state in an external location.
Actual result:
Message is added to the queue, even if it is duplicate, if the server is restarted in the meanwhile. This is potentially inconsistent behaviour, as the message is actually NOT added if the server is NOT restarted. This leads to the impression that instructing the server to save the state (via parameter --store_dir
does not persist everything state-related across restarts.
The system recovers duplicate IDs from stored messages upon restart. Currently there is no additional information saved for msgIDs separate from the original message. So when you purged them and restarted the server the system did not have state to rebuild the msgID map.
Thanks for the quick response. Your explanation is understandable, though this might not be the behaviour users expect. Please consider this issue to be a feature request, then.
I noticed the same issue when messages are successfully picked and processed from my work queue, so this is not only limited to purges. I assume that's because of the same reason. Is there a way I could workaround this for now? It is important for my use case that only non-duplicate messages are ending up on my work queue and that this behaviour is consistent across server restarts.
For now I would remove work queue or interest based retention policies and make them limits based with the age being the same as your duplicate ID window.
Thanks, I prefer to use the KV store instead, looks to me like a nice solution that might also hint a potential solution path on NATS side...
So you are using KV and msgIDs? How does purge plus the desire to duplicate detect on a KV in this scenario play out?
I only add items to the queue if they are not known to KV, and I add items to KV when I add things to the queue. Might be a rudimentary hack, but works for me...
https://github.com/heussd/nats-rss-article-url-feeder/blob/main/main.py#L25-L30
What is the high level goal?
I have a component that feeds URLs from the internet as messages. It has no state and no idea of time. But messages are uniquely identifiable by their URL, so that URL is what I use as key for KV.
What is the queued work you need to do when adding a new URL? Could you use a KV watcher instead for the processing?
I'll look into this, thanks!