kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

Need help: safe parallel eachBatch (duplicates + rebalances + memory leak)

Open yudhees opened this issue 4 months ago • 0 comments

Problem Summary

We are trying to use eachBatch() with parallel message processing inside the batch, while keeping heartbeats alive (to avoid rebalances).
However, this leads to the following problems:

  • Duplicate message processing (even after successful resolveOffset + commitOffsetsIfNecessary)
  • Frequent partition rebalances during batch processing
  • Memory leak / increasing RAM over time
  • Possible stuck heartbeat loop (not always cleaned up)
  • Offset replay after rebalance even when commitOffsetsIfNecessary is called

Our Current Consumer Code

await consumer.run({
  eachBatch: async ({
    batch,
    commitOffsetsIfNecessary,
    resolveOffset,
    heartbeat,
    pause,
    isRunning,
    isStale
  }) => {
    let running = true;

    // Heartbeat loop to prevent rebalance
    const hb = (async () => {
      while (running) {
        await heartbeat();
        await new Promise((r) => setTimeout(r, 1000));
      }
    })();

    // Parallel message processing
    const promises = batch.messages.map(async (message) => {
      logger.info(`Partition ${batch.partition} Topic ${batch.topic} Offset ${message.offset}`);
      const payload = message.value.toString('utf-8');

      try {
        await PlanPageOccUpdation.handle(JSON.parse(payload));
        resolveOffset(message.offset);
        logger.info(`Partition ${batch.partition} Topic ${batch.topic} Offset ${message.offset} Done`);
      } catch (err) {
        logger.error(err);
        logger.error(`Partition ${batch.partition} Topic ${batch.topic} Offset ${message.offset} Error`);
      }
    });

    await Promise.allSettled(promises);

    running = false;
    await hb;

    await commitOffsetsIfNecessary();
  },
  autoCommit: true,
  autoCommitInterval: 3000,
});

yudhees avatar Dec 06 '25 06:12 yudhees