kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

Commit resolved offsets on group rebalance

Open tulios opened this issue 6 years ago • 5 comments

We should commit the already resolved offsets when we get REBALANCE_IN_PROGRESS. Currently, we are dropping the offsets to perform the rebalance.

https://github.com/tulios/kafkajs/blob/11d7b8108ebe7dba0f5419c319ef4d94ba31e195/src/consumer/runner.js#L121-L123

consumer#heartbeat can throw the error and skip the commit phase, which is done by the fetch function.

https://github.com/tulios/kafkajs/blob/11d7b8108ebe7dba0f5419c319ef4d94ba31e195/src/consumer/runner.js#L174

I think it should be something like:

// ...
try {
  await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval })
} catch (e) {
  if (e.type === 'REBALANCE_IN_PROGRESS') {
    await this.consumerGroup.commitOffsets()
  }
  throw e
}

In this way, we "save" the work already done before we rebalance

tulios avatar Jan 15 '18 16:01 tulios

Is this feature implemented or not?

thynson avatar Apr 29 '19 08:04 thynson

Not to my knowledge.

Nevon avatar Apr 29 '19 08:04 Nevon

No, currently it drops the uncommitted offsets.

tulios avatar Apr 29 '19 08:04 tulios

@tulios : Is this issue valid for eachMessage implementation with default values of autoCommitInterval and autoCommitThreshold as well?

SathishKumarRamasamy avatar Jun 13 '20 14:06 SathishKumarRamasamy

According to Kafka Protocol it should submit uncommitted offsets before rejoining. This way it will not double process uncommitted offsets.

mark-b-ab avatar Jan 23 '22 11:01 mark-b-ab

I was scared about this issue but found that this is already fixed in master. See https://github.com/tulios/kafkajs/blob/master/src/consumer/runner.js#L207-L216

dardanos avatar Oct 28 '22 20:10 dardanos