kafkajs
kafkajs copied to clipboard
Need help: safe parallel eachBatch (duplicates + rebalances + memory leak)
Problem Summary
We are trying to use eachBatch() with parallel message processing inside the batch, while keeping heartbeats alive (to avoid rebalances).
However, this leads to the following problems:
- Duplicate message processing (even after successful resolveOffset + commitOffsetsIfNecessary)
- Frequent partition rebalances during batch processing
- Memory leak / increasing RAM over time
- Possible stuck heartbeat loop (not always cleaned up)
- Offset replay after rebalance even when commitOffsetsIfNecessary is called
Our Current Consumer Code
await consumer.run({
eachBatch: async ({
batch,
commitOffsetsIfNecessary,
resolveOffset,
heartbeat,
pause,
isRunning,
isStale
}) => {
let running = true;
// Heartbeat loop to prevent rebalance
const hb = (async () => {
while (running) {
await heartbeat();
await new Promise((r) => setTimeout(r, 1000));
}
})();
// Parallel message processing
const promises = batch.messages.map(async (message) => {
logger.info(`Partition ${batch.partition} Topic ${batch.topic} Offset ${message.offset}`);
const payload = message.value.toString('utf-8');
try {
await PlanPageOccUpdation.handle(JSON.parse(payload));
resolveOffset(message.offset);
logger.info(`Partition ${batch.partition} Topic ${batch.topic} Offset ${message.offset} Done`);
} catch (err) {
logger.error(err);
logger.error(`Partition ${batch.partition} Topic ${batch.topic} Offset ${message.offset} Error`);
}
});
await Promise.allSettled(promises);
running = false;
await hb;
await commitOffsetsIfNecessary();
},
autoCommit: true,
autoCommitInterval: 3000,
});