kafkajs icon indicating copy to clipboard operation
kafkajs copied to clipboard

Don't re-join when consumption has stopped

Open ascandella opened this issue 1 year ago • 1 comments

We're seeing an issue where consumers are erroneously rebalancing even after we try to shut them down during deploys. Below are some debug logs. I've tested this branch with my consumers and verified that they no longer try to participate in the re-balance (instead correctly leaving the group) if they detect a rebalance while shutting down so that the deploy can continue.

2023/05/08 09:22:24 [consumer-2] info: stop consumer group {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Runner","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:24.532Z","logger":"kafkajs","message":"stop consumer group","groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d"},"timestamp":"2023-05-08T16:22:24.532Z"}

# some time passes, they finish processing their batch

2023/05/08 09:22:41 [consumer-2] info: Request Heartbeat(key: 12, version: 3) {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Connection","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.628Z","logger":"kafkajs","message":"Request Heartbeat(key: 12, version: 3)","broker":"localhost:9092","correlationId":12,"expectResponse":true,"size":157},"timestamp":"2023-05-08T16:22:41.628Z"}
2023/05/08 09:22:41 [consumer-2] warn: Response Heartbeat(key: 12, version: 3) {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Connection","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"ERROR","timestamp":"2023-05-08T16:22:41.631Z","logger":"kafkajs","message":"Response Heartbeat(key: 12, version: 3)","broker":"localhost:9092","error":"The group is rebalancing, so a rejoin is needed","correlationId":12,"size":10},"timestamp":"2023-05-08T16:22:41.631Z"}
2023/05/08 09:22:41 [consumer-2] info: Response Heartbeat(key: 12, version: 3) {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Connection","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.631Z","logger":"kafkajs","message":"Response Heartbeat(key: 12, version: 3)","broker":"localhost:9092","error":"The group is rebalancing, so a rejoin is needed","correlationId":12,"payload":{"type":"Buffer","data":"[filtered]"}},"timestamp":"2023-05-08T16:22:41.631Z"}
2023/05/08 09:22:41 [consumer-2] info: Stopping fetchers... {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"FetchManager","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"Stopping fetchers..."},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: Stopped fetchers {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"FetchManager","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"Stopped fetchers"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: waiting for consumer to finish... {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Runner","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"waiting for consumer to finish...","groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: Stopped fetchers {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"FetchManager","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"Stopped fetchers"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] warn: The group is rebalancing, re-joining {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Runner","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"WARN","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"The group is rebalancing, re-joining","groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d","error":"The group is rebalancing, so a rejoin is needed"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: consumer.rebalancing {"name":"analytics-kafka-consumer","className":"Kafka","methodName":"logEvent","data":{"groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d","eventId":10},"timestamp":"2023-05-08T16:22:41.632Z"}

I'm not sure if this should be a flag that consumers can specify, but I don't know why you'd want your consumers to re-join a group if they're in the process of exiting.

ascandella avatar May 08 '23 17:05 ascandella

would love to see this functionality in mainline branch!

lkakol avatar Aug 14 '23 15:08 lkakol