kafkajs
kafkajs copied to clipboard
When a new consumer is added, the rebalance is completed multiple times triggering the EachMessage function
Describe the bug When a new consumer is added, the rebalance is completed multiple times triggering the EachMessage function I want to wait for the end of the previous one when executing the next EachMessage Or if you rebalance and execute a new EachMessage, notify me to delete the old one.
Under the same task, I should wait for the current task to complete before performing the next task; If rebalance exists, all processes reenter immediately, regardless of whether the current task has ended or not How do I ensure that only one task is running?
To Reproduce
- The consumer is executing EachMessage
- A new consumer has joined
- Rebalancing is in progress. The old user may still be executing EveryMessage. After the rebalancing is complete, everyone will immediately execute EachMessage, and the old user will have 2 EachMessage running
Expected behavior After rebalancing, the old EachMessage should be destroyed instead of both being retained and executed. When you need to run a new message after a rebalance, you should make sure that the previous one has finished executing
Observed behavior try to make sure that only one task is running by asking if the previous message has been completed in a loop
for (let message of batch.messages) {
if (!isRunning() || isStale()) break
if (service.inService) {
Logger.debug(`Unable to run multiple:${partition}-${message.offset}`)
await new Promise((resolve) => {
let time = setInterval(() => {
if (!service.inService) {
clearInterval(time)
resolve(true)
}
}, 1000)
})
}
// ....
}
I also tried to use these arrays to make sure that only one task was running
const addQueue = (function () {
const list = []
let state = 'end'
return async (call) => {
if (state === 'end') {
state = 'start'
await call()
while (list.length) {
const fn = list.shift()
await fn()
}
state = 'end'
} else {
list.push(call)
}
}
})()
// ....
for (let message of batch.messages) {
if (!isRunning() || isStale()) break
await new Promise((resolve, reject) => {
addQueue(async () => {
// await handel...
resolve(true)
})
})
// ....
Each error results in a doubling of the number of runs being made
let number = 0
consumer.run({
async eachBatch(payload) {
const { isRunning, isStale, batch } = payload
for (let message of batch.messages) {
if (!isRunning() || isStale()) break
number++
if (number % 3) throw 'err'
}
}
})
Environment: "kafkajs": "^1.15.0",
Additional context
const demo = async (index: number, topicName) => {
const kafka: Kafka = new Kafka(KafkaConfig)
const consumer = kafka.consumer({
groupId: topicName + '12',
sessionTimeout: 1000 * 60 * 30,
heartbeatInterval: 1000 * 60 * 10
})
await consumer.connect()
await consumer.subscribe({ topic: topicName, fromBeginning: true, })
const service = {
inService: false,
handleMessage: async (value: any, call = () => { }) => {
service.inService = true
const time = setInterval(call, 1000)
await new Promise(r => setTimeout(r, Math.random() * 50000))
clearInterval(time)
service.inService = false
}
}
consumer.run({
eachBatchAutoResolve: true,
autoCommitThreshold: 1,
autoCommit: true,
async eachBatch(payload) {
const { isRunning, isStale, resolveOffset, heartbeat, batch, } = payload
const { partition, topic } = batch
for (let message of batch.messages) {
if (!isRunning() || isStale()) break
if (service.inService) {
Logger.debug(`Unable to run multiple:${partition}-${message.offset}`)
}
Logger.log(`current task:${partition}-${message.offset}`)
let error;
await service.handleMessage('', async () => {
try {
console.log(`heartbeat:${partition}-${message.offset}`)
!error && await heartbeat()
} catch (err) {
console.error('heartbeat-error')
error = err
}
})
if (error) throw error
Logger.log(`end:${partition}-${message.offset}`, ' number')
await resolveOffset(message.offset)
await payload.commitOffsetsIfNecessary(payload.uncommittedOffsets())
await heartbeat()
}
},
})
}
After waiting for the above demo to execute eachMessage, run the demo again, it will be rebalanced, and the first function will have 2 executions of eachMessage
// Wait for the top to start running before starting a process.
Process 1 partial log
{"level":"INFO","timestamp":"2021-05-17T03:10:30.244Z","logge] Starting","groupId":"city12"} {"level":"INFO","timestamp":"2021-05-17T03:11:30.382Z","loggeGroup] Consumer has joined the group","groupId":"city12","memc-bb14-87dc8e333d7e","leaderId":"mt-shop-a9a400a8-c253-483c-b,"memberAssignment":{"city":[0,1,10,11,12,13,14,15,16,17,18,1,3,30,31,32,33,34,35,36,37,38,39,4,40,41,42,43,44,45,46,47,48RoundRobinAssigner","duration":60021} [Nest] 12548 - 2021/05/17 上午11:11:30 current task:32-1 heartbeat:32-1 heartbeat:32-1 heartbeat:32-1 heartbeat:32-1 heartbeat:32-1 heartbeat:32-1 [Nest] 12548 - 2021/05/17 上午11:11:37 [ number] end:32- [Nest] 12548 - 2021/05/17 上午11:11:37 current task:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 heartbeat:32-2 [Nest] 12548 - 2021/05/17 上午11:12:18 [ number] end:32- [Nest] 12548 - 2021/05/17 上午11:12:18 current task:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 heartbeat:32-3 [Nest] 12548 - 2021/05/17 上午11:12:33 [ number] end:32- [Nest] 12548 - 2021/05/17 上午11:12:33 current task:32-4 heartbeat:32-4 heartbeat:32-4 heartbeat:32-4 heartbeat:32-4 heartbeat:32-4 heartbeat:32-4 heartbeat:32-4 [Nest] 12548 - 2021/05/17 上午11:13:03 [ number] end:32-4 +30155ms {"level":"ERROR","timestamp":"2021-05-17T03:13:03.313Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator is not aware of this member","correlationId":11,"size":28} {"level":"ERROR","timestamp":"2021-05-17T03:13:03.352Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator is not aware of this member","correlationId":12,"size":28} [Nest] 12548 - 2021/05/17 上午11:13:03 current task:42-0 +176ms {"level":"ERROR","timestamp":"2021-05-17T03:13:03.488Z","logger":"kafkajs","message":"[Runner] The coordinator is not aware of this member, re-joining the group","groupId":"city12","memberId":"mt-shop-a9a400a8-c253-483c-bb14-87dc8e333d7e","error":"The coordinator is not aware of this member","retryCount":0,"retryTime":292} heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 heartbeat:42-0 [Nest] 12548 - 2021/05/17 上午11:13:43 [ number] end:42-0 +40314ms {"level":"ERROR","timestamp":"2021-05-17T03:14:03.498Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator number] end:42- is not aware of this member","correlationId":16,"size":34} 4:03.498Z","logg {"level":"ERROR","timestamp":"2021-05-17T03:1e OffsetCommit(k4:03.500Z","logger":"kafkajs","message":"[Cont.com:9092","clinection] Response OffsetCommit(key: 8, versionot aware of thin: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator 4:03.500Z","logg is not aware of this member","correlationId":e OffsetCommit(k18,"size":34} t.com:9092","cli [Nest] 12548 - 2021/05/17 上午11:14:03 cunot aware of thirrent task:29-0 +20017ms {"level":"INFO","timestamp":"2021-05-17T03:14rrent task:29-0:03.821Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","gr:03.821Z","loggeoupId":"city12","memberId":"mt-shop-955ab6d3-mer has joined tdaf4-471f-863a-b07e26d97bc6","leaderId":"mt-shop-955ab6d3-dafhop-955ab6d3-daf4-471f-863a-b07e26d97bc6","is-955ab6d3-daf4-4Leader":true,"memberAssignment":{"city":[0,1,rAssignment":{"c11,12,13,14,15,20,22,24,25,26,28,29,30,32,33,,30,32,33,39,4,439,4,41,44,45,48,49,7]},"groupProtocol":"RounAssigner","duratdRobinAssigner","duration":60332} [Nest] 12548 - 2021/05/17 上午11:14:03 Unable to run multable to run multiple:1-0 +45ms [Nest] 12548 - 2021/05/17 上午11:14:04 current task:1-0 rrent task:1-0 +271ms heartbeat:29-0 heartbeat:1-0 heartbeat:29-0 heartbeat:1-0 heartbeat:29-0 heartbeat:1-0 heartbeat:29-0 heartbeat:1-0 heartbeat:29-0 heartbeat:1-0 ... [Nest] 12548 - 2021/05/17 上午11:14:14 [ number] end:29-0 +10211ms heartbeat:1-0 [Nest] 12548 - 2021/05/17 上午11:14:14 current task:29-1 +36ms heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 heartbeat:1-0 heartbeat:29-1 ... [Nest] 12548 - 2021/05/17 上午11:14:39 [ number] end:29-1 +25011ms [Nest] 12548 - 2021/05/17 上午11:14:39 current task:29-2 +2ms heartbeat:1-0 heartbeat:29-2 [Nest] 12548 - 2021/05/17 上午11:14:40 [ number] end:29-2 +1157ms heartbeat:1-0 [Nest] 12548 - 2021/05/17 上午11:14:40 current task:29-3 +3ms heartbeat:1-0 heartbeat:29-3 heartbeat:1-0 heartbeat:29-3 heartbeat:1-0 heartbeat:29-3 heartbeat:1-0 heartbeat:29-3 heartbeat:1-0 heartbeat:29-3 ... [Nest] 12548 - 2021/05/17 上午11:14:53 [ number] end:1-0 +13007ms [Nest] 12548 - 2021/05/17 上午11:14:53 current task:1-1 +3ms heartbeat:29-3 [Nest] 12548 - 2021/05/17 上午11:14:54 [ number] end:29-3 +1030ms [Nest] 12548 - 2021/05/17 上午11:14:54 current task:29-4 +3ms heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 heartbeat:29-4 heartbeat:1-1 ... [Nest] 12548 - 2021/05/17 上午11:15:03 [ number] end:1-1 +8481ms [Nest] 12548 - 2021/05/17 上午11:15:03 current task:1-2 +2ms heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 heartbeat:1-2 heartbeat:29-4 ... [Nest] 12548 - 2021/05/17 上午11:15:20 [ number] end:1-2 +17388ms {"level":"ERROR","timestamp":"2021-05-17T03:15:20.471Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator is not aware of this member","correlationId":29,"size":28} {"level":"ERROR","timestamp":"2021-05-17T03:15:20.672Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 5)","broker":"CT-IT03.chatunet.com:9092","clientId":"mt-shop","error":"The coordinator is not aware of this member","correlationId":30,"size":28} [Nest] 12548 - 2021/05/17 上午11:15:20 current task:15-0 +204ms {"level":"ERROR","timestamp":"2021-05-17T03:15:20.674Z","logger":"kafkajs","message":"[Runner] The coordinator is not aware of this member, re-joining the group","groupId":"city12","memberId":"mt-shop-955ab6d3-daf4-471f-863a-b07e26d97bc6","error":"The coordinator is not aware of this member","retryCount":0,"retryTime":283} heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 heartbeat:15-0 heartbeat:29-4 ...
@t-d-d @Nevon @tulios help
@521707 I don't have time to look at this in detail. But first thing I would try is to not call consumer.commitOffsets() from with your eachMessage() function - I'm not sure if that will work.
Instead, if you want to commit after every message use autoCommit: true
and autoCommitThreshold: 1
.
@521707 I don't have time to look at this in detail. But first thing I would try is to not call consumer.commitOffsets() from with your eachMessage() function - I'm not sure if that will work.
Instead, if you want to commit after every message use
autoCommit: true
andautoCommitThreshold: 1
.
I updated the problem and used autoCommit: true and autoCommitThreshold: 1
There are still outcomes that I don't want , I added the log
did you solved?
I solved that by handling batches myself. If on heartbeat I receive rebalance error - I stop fetching, wait to finish the job (make sure that your session interval bigger that time to process job), commit offsets and start rejoin.
Hi @mark-b-ab. Can you please provide me the code? Thanks in advance.