reactor-kafka icon indicating copy to clipboard operation
reactor-kafka copied to clipboard

Consumer does not rejoin group after heartbeat timeout

Open 62mkv opened this issue 2 years ago • 7 comments

Expected Behavior

Actual Behavior

We have a Reactive Spring Boot application that employs "reactor-kafka" for Kafka consumers and producers.

we use 1 KafkaReceiver per topic, that is subscribed to and kept in a Spring bean field.

I observe that sometimes, some or all of the underlying Consumer-s just stop with an error message as follows:

"[Consumer clientId=consumer-my-service-2-11, groupId=my-service-2] Member consumer-my-service-2-11-2ebeee54-566c-4ae8-ac43-1d5710fee1fa sending LeaveGroup request to coordinator 192.168.0.224:14493 (id: 2147483619 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."

(this is the last message in the log thus far; the application lives happily for a day already, after all 11 of consumers have stuck in this limbo; topic is consumed by other pods)

Regardless of what the error says, should not consumer still be restarted by the library/Kafka internals? Or should it be application author's responsibility to somehow track this state and react accordingly (for example, by implementing liveness health check around this somehow)?

Steps to Reproduce

Possible Solution

Your Environment

  • Reactor version(s) used: 1.0.11
  • Other relevant libraries versions (eg. netty, ...): reactor-kafka: 1.3.17
  • JVM version (java -version): 11.0.16.1
  • OS and version (eg uname -a): Linux x64

62mkv avatar Apr 14 '23 15:04 62mkv

Looks like this is this your SO question, too: https://stackoverflow.com/questions/76015312/how-to-properly-deal-with-zombie-kafka-consumers-in-reactive-spring-boot-appli

artembilan avatar Apr 14 '23 15:04 artembilan

this was originally asked on SO here: https://stackoverflow.com/questions/76015312/how-to-properly-deal-with-zombie-kafka-consumers-in-reactive-spring-boot-appli but as I read the code afterwards, it feels more and more like a bug :(

62mkv avatar Apr 14 '23 15:04 62mkv

I see. Any thoughts about the possible fix? Or share, please, with us what part of the project code you think is producing such a bug?

artembilan avatar Apr 14 '23 15:04 artembilan

Well I didn't design the library so it's hard to tell is it a bug of reactor -kafka, of kafka-clients or a completely valid behaviour. I've skimmed through the ConsumerCoordinator and it looks like it's bound to rejoin, if it's ever polled again. But is it polled or not - this, I guess, is reactor-kafka responsibility. So I am posting it here hoping that the community will set me straight if I am wrong 🙏🙏

Sent from Mail.ru app for Android Friday, 14 April 2023, 06:52pm +03:00 from Artem Bilan @.*** :

I see. Any thoughts about the possible fix? Or share, please, with us what part of the project code you think is producing such a bug? — Reply to this email directly, view it on GitHub , or unsubscribe . You are receiving this because you authored the thread. Message ID: @ github . com>

62mkv avatar Apr 14 '23 15:04 62mkv

You need to add retry (and possibly repeat) to the pipeline: https://projectreactor.io/docs/kafka/release/reference/#_error_handling_2

garyrussell avatar Apr 17 '23 15:04 garyrussell

@garyrussell I see. Thanks so much for your help!

I will give it a try and close the ticket when I can confirm it's no longer manifesting.

May I also ask about a different thing: how useful would it be, to have a partition revoke handler commit offsets?

        var receiverOptions = ReceiverOptions.create(getConsumerProperties())
                                             .commitInterval(DEFAULT_COMMIT_INTERVAL)
                                             .addAssignListener(this::handlePartitionsAssignment)
                                             .addRevokeListener(this::handlePartitionsRevoking)
                                             .subscription(activeDestinations);
...
    @SneakyThrows(IllegalAccessException.class)
    private void handlePartitionsRevoking(Collection<ReceiverPartition> revokedPartitions) {
        var consumer = (Consumer<Object, Object>) FieldUtils.readField(kafkaReceiver.consumerHandler(), "consumer", true);

        try {
            consumer.commitSync(latestAcks, Duration.ofMillis(1000));
        } catch (Exception e) {
            log.warn("Ignored error on partition revoke", e);
        }
    }

It seems that the documentation says that downstream consumer should not be concerned with this:

All acknowledged offsets are committed when partitions are revoked during rebalance and when the receive Flux is terminated

So this quoted code in the handler, presented above, is some legacy code and could/should be removed? Or it might be needed still? We have WakeupException inside this handler occasionally so I thought I'd ask some experts..

62mkv avatar Apr 17 '23 16:04 62mkv

I don't know what latestAcks is there but, indeed, the CommitEvent is run when partitions are revoked so any acknowledged records will be committed. See ConsumerEventLoop.onPartitionsRevoked:

https://github.com/reactor/reactor-kafka/blob/d1aa41b99d2993d1980d0a76561efb4a0f60d139/src/main/java/reactor/kafka/receiver/internals/ConsumerEventLoop.java#L161-L166

garyrussell avatar Apr 17 '23 19:04 garyrussell