kafka icon indicating copy to clipboard operation
kafka copied to clipboard

How to reconnect after a processing error occurred

Open reneklootwijk opened this issue 9 years ago • 1 comments

Upon receiving a message via Kafka I have to do some processing. When the processing succeeds the offset is committed and if the fails the consumer is ended. When the cause of the failure has been solved, e.g. a connection to a backend system has been reestablished, the consumer should start consuming again. I try to simulate this behaviour like:

var counter = 0; var dataHandler = function (messageSet, topic, partition) { messageSet.forEach(function (m) { console.log(topic, partition, m.offset, m.message.value.toString('utf8')); if(counter === 10) { counter = 0; setTimeout(function() { consumer.init(strategies); }, 5000); consumer.end(); } else { consumer.commitOffset({topic: topic, partition: partition, offset: m.offset}); counter++; } }); };

where after each 10th received message the consumer connection is ended and then after 5 seconds reestablished.

When the consumer is initialized again, the following log message is displayed: INFO consumer1 Joined group group1 generationId 27 as consumer1-d8c9faa4-32d7-4074-baaa-7f0091f7e481 and directly after that the consumer crashes, displaying: Unhandled rejection TypeError: Cannot read property 'isPending' of null at Client.updateMetadata (/Users/rene/Documents/WebstormProjects/kafka/node_modules/no-kafka/lib/client.js:201:37) at GroupConsumer._updateSubscriptions (/Users/rene/Documents/WebstormProjects/kafka/node_modules/no-kafka/lib/group_consumer.js:305:24) at /Users/rene/Documents/WebstormProjects/kafka/node_modules/no-kafka/lib/group_consumer.js:159:21 at tryCatcher (/Users/rene/Documents/WebstormProjects/kafka/node_modules/no-kafka/node_modules/bluebird/js/release/util.js:16:23) at Promise._settlePromiseFromHandler (/Users/rene/Documents/WebstormProjects/kafka/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:510:31) at Promise._settlePromise (/Users/rene/Documents/WebstormProjects/kafka/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:567:18) at Promise._settlePromise0 (/Users/rene/Documents/WebstormProjects/kafka/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:612:10) at Promise._settlePromises (/Users/rene/Documents/WebstormProjects/kafka/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:691:18) at Async._drainQueue (/Users/rene/Documents/WebstormProjects/kafka/node_modules/no-kafka/node_modules/bluebird/js/release/async.js:138:16) at Async._drainQueues (/Users/rene/Documents/WebstormProjects/kafka/node_modules/no-kafka/node_modules/bluebird/js/release/async.js:148:10) at Immediate.Async.drainQueues [as _onImmediate] (/Users/rene/Documents/WebstormProjects/kafka/node_modules/no-kafka/node_modules/bluebird/js/release/async.js:17:14) at processImmediate [as _immediateCallback] (timers.js:383:17)

what am I doing wrong?

reneklootwijk avatar Nov 23 '16 11:11 reneklootwijk

Nobody? Or is my question not clear?

reneklootwijk avatar Dec 01 '16 12:12 reneklootwijk