nats-streaming-server icon indicating copy to clipboard operation
nats-streaming-server copied to clipboard

All unacknowledged messages are delivered at once on subscription resumption

Open martin-spinks opened this issue 4 years ago • 2 comments

Background We've noticed a behaviour we would like to suggest a change for. We are running a number of subscribers (in both node and python), these are subscribed with a durable name a queue group and manual acks. When running multiple instances of these subscribers, messages are distributed across the instances and subscribers can drop out and come back fine (durable name). Not all messages that are sent will be acknowledged, if a piece of work fails its message is not acknowledged and nats-streaming will retry later.

Request We have noticed that if all the subscribers close their connections the first one back gets a huge influx of unacknowledged messages (it can be a lot and we suspect it maybe the entire backlog). Can nats-streaming therefore honour the maxInFlight setting for that channel when attempting to re-deliver all the unacknowledged messages on resumption?

We understand this is related to #732 and #187. We don't have any problem with unacknowledged messages being redelivered first on resume, this is actually fine for our use case. But having all messages re-delivered at once is a problem as there could be a LOT of them. This could send the first listening subscription into a sudden heavy load situation.

To replicate If we have a running nats-streaming-server and we have a subscriber (below in Node) that only acknowledges messages that are strings that equal bob. (subscriber.js)

const stan = require('node-nats-streaming');
const os = require('os');

// Client Id
const clientId = `${os.hostname()}.${process.pid}`.replace(/\./g, '-');

// Transport
const transport = stan.connect('test-cluster', clientId);
transport.on('connect', function () {
  // Generic function
  const caller = (msg) => {
    console.log('Function', msg.getSequence(), msg.getData());
    if (msg.getData() === 'bob') {
      console.log('Got a bob message');
      msg.ack()
    }
  }

  // Generic function
  const wrapper = (msg) => {
    caller(msg);
  }
  
  const exit = () => {
    transport.close();
  }

  // Set options
  const options = transport.subscriptionOptions();
  options.setAckWait(2000);
  options.setMaxInFlight(2);
  options.setManualAckMode(true);
  options.setDurableName('nlp.a');

  // Subscribe
  const subscription = transport.subscribe('nlp.a', 'q.nlp.a', options);
  subscription.on('message', wrapper);

  process.on('SIGTERM', exit);
  process.on('SIGINT', exit);
  process.on('SIGQUIT', exit);

  // Done
  console.log('Ready');
})

If we run multiple instances of this, messages are shared between the instances (queue name), if one instance is stopped and started it will resume and join the group again. If we send bob messages and non bob messages via: (caller.js)

const stan = require('node-nats-streaming');
const os = require('os');

// Client Id
const clientId = `${os.hostname()}.${process.pid}`.replace(/\./g, '-');

// Transport
const transport = stan.connect('test-cluster', clientId);
transport.on('connect', function () {

  for (let i = 0; i < 8; i++) {
    transport.publish('nlp.a', JSON.stringify({ a: 'a' }), function (err, uid) {
      console.log('Message recieved ', uid);
    })    
  }

  for (let i = 0; i < 8; i++) {
    transport.publish('nlp.a', 'bob', function (err, uid) {
      console.log('Bob message recieved ', uid);
    })
  }
})

You can see bob messages appearing and being acknowledged and after sometime you can observe all the retries happening. If you then close both instances of the of the subscriber.js and start just one instance of subsriber.js again; it receives more that the maxInFlight messages (in this example 2). This number could get very large very quickly in our situation.

Environment

nats-streaming-server version 0.16.2, nats-server: v2.0.4

martin-spinks avatar Jan 14 '20 11:01 martin-spinks