Segfault while closing the consumer while consume loop is running
Reproduction code:
function runConsumer() {
const consumer = new RdKafka.KafkaConsumer({
'group.id': 'test-group' + Math.random(),
'bootstrap.servers': 'localhost:9092',
}, {
'auto.offset.reset': 'earliest',
});
consumer.connect();
consumer.on('ready', () => {
console.log("Consumer is ready");
consumer.subscribe(['test-topic']);
consumer.consume(); // consume loop
});
consumer.on('data', (data) => {
console.log("Received data");
console.log(data);
consumer.disconnect();
});
consumer.on('event.error', (err) => {
console.error(err);
});
}
Cause: NodeKafka::Workers::KafkaConsumerConsumeLoop::HandleMessageCallback is called after KafkaConsumerConsumerLoop:Close and the callback has been cleared by that time so callback->Call causes a segfault.
This is because of the worker.WorkComplete() added in kafka_consumer.cc/NodeDisconnect in 2.16.1 , can be reproduced on double disconnect or pause and disconnect, connect/disconnect at the same time.
Another note here is that if getMetadata fails (timeout) it will call disconnect on its own here: /lib/client.js#L165 so if you call disconnect as well it will cause double disconnect. could be caused by GC on javascript side, since v8::Persistent can be GC'ed if the callback passed goes out of scope.
The fix would be to check (callback && !callback->IsEmpty()) here: /src/workers.cc#L770 as that can still run after worker->WorkComplete() has been called.
https://github.com/Blizzard/node-rdkafka/pull/1136
Thank you @constantind for the nice explanation.
As it took me some time to find the exact references, the following might help others to save some time:
- the changes in the package "node-rdkafka" from v2.16.0 to v2.16.1 introduced a cleanup of the async worker in kafka_consumer.cc
- the mentioned WorkComplete() can in this package be found in kafka-consumer.cc#L1483
- the line of the client disconnect also references "node-rdkafka" and can be found in this package in client.js#L267
- the suggested fix with the callback check should here then be done in workers.cc#L771