kafkajs
kafkajs copied to clipboard
Commit resolved offsets on group rebalance
We should commit the already resolved offsets when we get REBALANCE_IN_PROGRESS
. Currently, we are dropping the offsets to perform the rebalance.
https://github.com/tulios/kafkajs/blob/11d7b8108ebe7dba0f5419c319ef4d94ba31e195/src/consumer/runner.js#L121-L123
consumer#heartbeat
can throw the error and skip the commit phase, which is done by the fetch
function.
https://github.com/tulios/kafkajs/blob/11d7b8108ebe7dba0f5419c319ef4d94ba31e195/src/consumer/runner.js#L174
I think it should be something like:
// ...
try {
await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval })
} catch (e) {
if (e.type === 'REBALANCE_IN_PROGRESS') {
await this.consumerGroup.commitOffsets()
}
throw e
}
In this way, we "save" the work already done before we rebalance
Is this feature implemented or not?
Not to my knowledge.
No, currently it drops the uncommitted offsets.
@tulios : Is this issue valid for eachMessage implementation with default values of autoCommitInterval and autoCommitThreshold as well?
According to Kafka Protocol it should submit uncommitted offsets before rejoining. This way it will not double process uncommitted offsets.
I was scared about this issue but found that this is already fixed in master
.
See
https://github.com/tulios/kafkajs/blob/master/src/consumer/runner.js#L207-L216