kafkajs
kafkajs copied to clipboard
Don't re-join when consumption has stopped
We're seeing an issue where consumers are erroneously rebalancing even after we try to shut them down during deploys. Below are some debug logs. I've tested this branch with my consumers and verified that they no longer try to participate in the re-balance (instead correctly leaving the group) if they detect a rebalance while shutting down so that the deploy can continue.
2023/05/08 09:22:24 [consumer-2] info: stop consumer group {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Runner","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:24.532Z","logger":"kafkajs","message":"stop consumer group","groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d"},"timestamp":"2023-05-08T16:22:24.532Z"}
# some time passes, they finish processing their batch
2023/05/08 09:22:41 [consumer-2] info: Request Heartbeat(key: 12, version: 3) {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Connection","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.628Z","logger":"kafkajs","message":"Request Heartbeat(key: 12, version: 3)","broker":"localhost:9092","correlationId":12,"expectResponse":true,"size":157},"timestamp":"2023-05-08T16:22:41.628Z"}
2023/05/08 09:22:41 [consumer-2] warn: Response Heartbeat(key: 12, version: 3) {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Connection","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"ERROR","timestamp":"2023-05-08T16:22:41.631Z","logger":"kafkajs","message":"Response Heartbeat(key: 12, version: 3)","broker":"localhost:9092","error":"The group is rebalancing, so a rejoin is needed","correlationId":12,"size":10},"timestamp":"2023-05-08T16:22:41.631Z"}
2023/05/08 09:22:41 [consumer-2] info: Response Heartbeat(key: 12, version: 3) {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Connection","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.631Z","logger":"kafkajs","message":"Response Heartbeat(key: 12, version: 3)","broker":"localhost:9092","error":"The group is rebalancing, so a rejoin is needed","correlationId":12,"payload":{"type":"Buffer","data":"[filtered]"}},"timestamp":"2023-05-08T16:22:41.631Z"}
2023/05/08 09:22:41 [consumer-2] info: Stopping fetchers... {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"FetchManager","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"Stopping fetchers..."},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: Stopped fetchers {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"FetchManager","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"Stopped fetchers"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: waiting for consumer to finish... {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Runner","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"waiting for consumer to finish...","groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: Stopped fetchers {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"FetchManager","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"DEBUG","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"Stopped fetchers"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] warn: The group is rebalancing, re-joining {"name":"analytics-kafka-consumer","className":"KafkaClient","methodName":"Runner","data":{"clientId":"8b8882b7-b751-4e95-8bd6-1be20a397171","label":"WARN","timestamp":"2023-05-08T16:22:41.632Z","logger":"kafkajs","message":"The group is rebalancing, re-joining","groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d","error":"The group is rebalancing, so a rejoin is needed"},"timestamp":"2023-05-08T16:22:41.632Z"}
2023/05/08 09:22:41 [consumer-2] info: consumer.rebalancing {"name":"analytics-kafka-consumer","className":"Kafka","methodName":"logEvent","data":{"groupId":"analytics-kafka-consumer","memberId":"8b8882b7-b751-4e95-8bd6-1be20a397171-ccecf5e6-7899-461f-b476-7d90f4c2779d","eventId":10},"timestamp":"2023-05-08T16:22:41.632Z"}
I'm not sure if this should be a flag that consumers can specify, but I don't know why you'd want your consumers to re-join a group if they're in the process of exiting.
would love to see this functionality in mainline branch!