pekko-connectors-kafka
pekko-connectors-kafka copied to clipboard
Apache Pekko based Streaming application is idle and not consuming from kafka topic post rebalancing
Issue: Apache Pekko-based Streaming Application Idle After Kafka Rebalancing
Description: Our application, which runs on Kubernetes with KEDA scaling based on consumer group lag, has encountered issues post-rebalancing. During testing, the application scales up and down as expected. However, after a rebalance, certain Kafka topic partitions are no longer being consumed.
We observed that while the consumer is assigned multiple partitions, some partitions are paused and never resumed, resulting in unprocessed messages. The application only resumes consuming these pending messages when there is another surge of input messages, which triggers pod scaling and, subsequently, another rebalance.
Can you isolate whether this is a pekko-connectors-kafka issue or whether it's due to the underlying kafka client?
also, we need to know which version of pekko-connectors-kafka that you are using - it is recommended to use pekko-connectors-kafka 1.1.0 if you can
it is exactly the same issue as described here. https://github.com/akka/alpakka-kafka/pull/194
it is not underlying client issue but something to do with rebalance which pauses some partition but never resume them again, i checked KafkaConsumerActor file which has poll method where for some partitions we are not calling resume method. i checked this behaviour in debug logs.
i am using pekko-connectors-kafka 1.0.0 version. actually i am running with scala 2.12. Can i use 1.1.0 with scala 2.12?
We are experiencing exactly the same behavior.
Currently using pekko-connectors-kafka 1.1.0, while overriding the kafka-client dependency to 3.8.1 due to https://issues.apache.org/jira/browse/KAFKA-17227.
Scala 2.13.16.
Are there plans to pick up this issue, or is this awaiting funding / external contributions? We appreciate all the efforts that are put into this repo very much and would like to have the issue fixed, so we can keep using pekko-connectors-kafka as a reliable building block.
@roy-tc We don't have any volunteers that are focused on this component. The link to https://github.com/akka/alpakka-kafka/pull/194 is misleading as this code is also part of pekko-connectors-kafka. Maybe the fix did not fix all scenarios.
One option is to try to tweak some of the consumer configs: https://github.com/apache/pekko-connectors-kafka/blob/6ee7684055961d4c5ae3808102361f8c44cb2baf/core/src/main/resources/reference.conf#L51
Also, it would be useful to provide stack dumps and/or log from the paused consumers nodes.
@pjfanning thanks for the quick reply! We were able to reproduce the behavior, however with INFO logging we only observed that the logs of the consumer simply stopped. We're currently in the process of reproducing the issue with DEBUG / TRACE logging enabled and will post our results here. We'll also try to tweak the consumer configs and return with our findings.
It would also be useful to know which type of consumer is in use. This module supports a lot of different options. Some are better suited to cluster rebalancing than others.
https://pekko.apache.org/docs/pekko-connectors-kafka/current/consumer.html#choosing-a-consumer
We are seeing the same issue with committablePartitionedManualOffsetSource
@pjfanning unfortunately we were not able to extract any useful logging. For now, our workaround will be to restart services whenever their kafka consumer will get stuck based on active alerting.
It would also be useful to know which type of consumer is in use.
We are currently using a committableSource, which we subscribe to a topic (no specific partition). As far as we could assess, this fits our needs best, as we prefer an at-least-once delivery. In the documentation you linked, I couldn't find which consumer would be more resilient to cluster rebalancing, what would you advise?
Thanks again!
This is not a component that I currently use. Take any suggestions that I make with a grain of salt. I notice there is a committablePartitionedSource which is partition aware. Maybe this might behave better in a rebalance scenario.
FYI, after a long period of observations, we were able to find the root cause in our own code that calls the pekko-connectors-kafka lib, not the lib itself. Due to a wrong use of KillSwitches, service instances could not properly resubscribe consumers when our health check would go back and forth between healthy and unhealthy.
Many thanks for the support nonetheless!