nats-streaming-server
nats-streaming-server copied to clipboard
All unacknowledged messages are delivered at once on subscription resumption
Background
We've noticed a behaviour we would like to suggest a change for. We are running a number of subscribers (in both node and python), these are subscribed with a durable name
a queue group
and manual acks
. When running multiple instances of these subscribers, messages are distributed across the instances and subscribers can drop out and come back fine (durable name). Not all messages that are sent will be acknowledged, if a piece of work fails its message is not acknowledged and nats-streaming
will retry later.
Request
We have noticed that if all the subscribers close their connections the first one back gets a huge influx of unacknowledged messages (it can be a lot and we suspect it maybe the entire backlog). Can nats-streaming
therefore honour the maxInFlight
setting for that channel when attempting to re-deliver all the unacknowledged messages on resumption?
We understand this is related to #732 and #187. We don't have any problem with unacknowledged messages being redelivered first on resume, this is actually fine for our use case. But having all messages re-delivered at once is a problem as there could be a LOT of them. This could send the first listening subscription into a sudden heavy load situation.
To replicate
If we have a running nats-streaming-server
and we have a subscriber (below in Node) that only acknowledges messages that are strings that equal bob
. (subscriber.js)
const stan = require('node-nats-streaming');
const os = require('os');
// Client Id
const clientId = `${os.hostname()}.${process.pid}`.replace(/\./g, '-');
// Transport
const transport = stan.connect('test-cluster', clientId);
transport.on('connect', function () {
// Generic function
const caller = (msg) => {
console.log('Function', msg.getSequence(), msg.getData());
if (msg.getData() === 'bob') {
console.log('Got a bob message');
msg.ack()
}
}
// Generic function
const wrapper = (msg) => {
caller(msg);
}
const exit = () => {
transport.close();
}
// Set options
const options = transport.subscriptionOptions();
options.setAckWait(2000);
options.setMaxInFlight(2);
options.setManualAckMode(true);
options.setDurableName('nlp.a');
// Subscribe
const subscription = transport.subscribe('nlp.a', 'q.nlp.a', options);
subscription.on('message', wrapper);
process.on('SIGTERM', exit);
process.on('SIGINT', exit);
process.on('SIGQUIT', exit);
// Done
console.log('Ready');
})
If we run multiple instances of this, messages are shared between the instances (queue name), if one instance is stopped and started it will resume and join the group again. If we send bob
messages and non bob messages via: (caller.js)
const stan = require('node-nats-streaming');
const os = require('os');
// Client Id
const clientId = `${os.hostname()}.${process.pid}`.replace(/\./g, '-');
// Transport
const transport = stan.connect('test-cluster', clientId);
transport.on('connect', function () {
for (let i = 0; i < 8; i++) {
transport.publish('nlp.a', JSON.stringify({ a: 'a' }), function (err, uid) {
console.log('Message recieved ', uid);
})
}
for (let i = 0; i < 8; i++) {
transport.publish('nlp.a', 'bob', function (err, uid) {
console.log('Bob message recieved ', uid);
})
}
})
You can see bob messages appearing and being acknowledged and after sometime you can observe all the retries happening. If you then close both instances of the of the subscriber.js
and start just one instance of subsriber.js
again; it receives more that the maxInFlight
messages (in this example 2). This number could get very large very quickly in our situation.
Environment
nats-streaming-server version 0.16.2, nats-server: v2.0.4