node-amqp icon indicating copy to clipboard operation
node-amqp copied to clipboard

Stream errors when using queue.shift() and prefetchCount

Open matomesc opened this issue 11 years ago • 3 comments

Versions used: [email protected], [email protected] and [email protected].

I have a consumer that is supposed to pull 5 messages off a queue, process and ack them. It looks like:


var queue = {
       subOptions: { ack: true, prefetchCount: 5 },
       /* other options */
};

connection.queue(queue.name, queue.options, function (q) {
    q.bind(queue.bindOptions.exchange, queue.bindOptions.key);
    q.subscribe(queue.subOptions, function (message, headers, deliveryInfo, ack) {
        work(message, function (err, result) {
            if (err) {
                log.error(err);
            }
            return q.shift();
        });
    });
});

When i start running the consumer, i'm getting a bunch of the following errors on the q.shift():

Error: write after end
at writeAfterEnd (_stream_writable.js:130:12)
at Socket.Writable.write (_stream_writable.js:178:5)
at Socket.write (net.js:613:40)
at Connection.self.(anonymous function) [as write] (/tmp/node_modules/amqp/lib/connection.js:607:27)
at Connection._sendMethod (/tmp/node_modules/amqp/lib/connection.js:782:8)
at Message.acknowledge (/tmp/node_modules/amqp/lib/message.js:52:25)
at Queue.shift (/tmp/node_modules/amqp/lib/queue.js:203:25)
at /tmp/test.js:50:14
at /tmp/test.js:182:10
at /tmp/test.js:128:12

Also i'm also seeing the following errors randomly but with various tags (5, 7, 9, 21, 40) and identical stacks:

Error: PRECONDITION_FAILED - unknown delivery tag 21
at Queue._onMethod (/tmp/node_modules/amqp/lib/queue.js:482:15)
at Queue.Channel._onChannelMethod (/tmp/node_modules/amqp/lib/channel.js:85:12)
at Connection._onMethod (/tmp/node_modules/amqp/lib/connection.js:428:28)
at AMQPParser.self.parser.onMethod (/tmp/node_modules/amqp/lib/connection.js:133:12)
at AMQPParser._parseMethodFrame (/tmp/node_modules/amqp/lib/parser.js:360:10)
at frameEnd (/tmp/node_modules/amqp/lib/parser.js:93:16)
at frame (/tmp/node_modules/amqp/lib/parser.js:78:14)
at AMQPParser.header [as parse] (/tmp/node_modules/amqp/lib/parser.js:64:14)
at AMQPParser.execute (/tmp/node_modules/amqp/lib/parser.js:137:21)
at Connection.<anonymous> (/tmp/node_modules/amqp/lib/connection.js:171:21)

Any idea about what's going on here?

matomesc avatar Aug 26 '14 23:08 matomesc

I'm not sure if this is the issue, but .shift() on a queue will ack the last received message. Maybe prefetch count is screwing that up and acknowledging the last received message 5 times? Unknown delivery tag tends to indicate you've acknowledged a message more than once.

amackera avatar Aug 29 '14 16:08 amackera

Instead of q.shift(), i just did ack.acknowledge() which solved the problem. What exactly is the point of .shift()?

matomesc avatar Aug 30 '14 01:08 matomesc

q.shift() is a terrible idea. combined with prefetchCount it's a bad bad combo. should be deprecated, shout warnings on use and be removed pronto.

algesten avatar Apr 18 '16 04:04 algesten