smallrye-reactive-messaging icon indicating copy to clipboard operation
smallrye-reactive-messaging copied to clipboard

Lambda Snapstart kafka connection errors

Open hamburml opened this issue 1 year ago • 11 comments
trafficstars

Describe the bug

copy from https://github.com/quarkusio/quarkus/issues/42286

maybe here is a better place :)

Hi,

we use snapstart on our quarkus lambdas. Some of them use smallrye-messaging to write or receive messages from a kafka. This works as expected unfortunately in our logs we have some warnings that the connection to a kafka node was lost either to auth error or firewall blocking.

    "loggerClassName": "org.apache.kafka.common.utils.LogContext$LocationAwareKafkaLogger",
    "loggerName": "org.apache.kafka.clients.NetworkClient",
    "level": "WARN",
    "message": "[Producer clientId=kafka-producer-event-xxxx] Connection to node xx (hxxxx.amazonaws.com/xxx:9096) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.",

Afaik during the init phase the whole memory of a started quarkus lambda is stored and when the lambda is reused reloaded into the memory to skip the init phase. That also means that pooled connections are "stored" but in reality are already closed.

Now I thought i simply need to close all open kafka connections before the snapshot is created. I did this with a org.crac.Resource and the beforeCheckpoint method. Now the warnings in the log are gone but it looks like no new connections are initiated and therefore all messages send via a channel fail. I also used KafkaProducer::flush but that didnt help.

Any ideas?

@ApplicationScoped
@Slf4j
public class KafkaHelper implements Resource {

    @Inject
    KafkaClientService kafkaClientService;

    void onStart(@Observes StartupEvent ev) {
        Core.getGlobalContext().register(this);
    }

    @Override
    public void beforeCheckpoint(org.crac.Context<? extends Resource> context)
            throws Exception {
        log.info("kafkaproducer {}", kafkaClientService.getProducerChannels());
        log.info("kafkaconsumer {}", kafkaClientService.getConsumerChannels());

        log.info("going to sleep");
        var listOfProducer = kafkaClientService.getProducerChannels().stream()
                .map(kafkaClientService::getProducer)
                .map(KafkaProducer::flush) // with KafkaProducer::close log warnings are gone but all future messages fail
                .toList();

        Uni.combine().all().unis(listOfProducer)
                .combinedWith(unused -> null)
                .await().atMost(Duration.ofSeconds(10));
        log.info("going to sleep 2");
    }
    @Override
    public void afterRestore(org.crac.Context<? extends Resource> context)
            throws Exception {

        // is there a 'init connection' method?
        log.info("i am back");

    }
}

I found https://github.com/quarkusio/quarkus/issues/31401 which is the same issue but with database connections.

hamburml avatar Aug 05 '24 15:08 hamburml

Our Kafka support does not support snapstart or CRAC. How Kafka works makes it very hard to snapshot it. I would recommend, for safety reasons, to only initialize after the restore.

cescoffier avatar Aug 19 '24 15:08 cescoffier

Thanks for your reply! There is an initial https://github.com/apache/kafka/pull/13619 which tries to handle CRaC but it looks like there is not that much interest.

I would recommend, for safety reasons, to only initialize after the restore.

Exactly, this is what I want. I do not need a snapshot of a working kafka client, I need a method to call on the kafka client so that it reconnects and/or verifies current connections. This would remove old connections which are gone (because they were there during the snapshot) and create a new. Can you point me to a method which I could call in the afterRestore method?

hamburml avatar Aug 19 '24 15:08 hamburml

You cannot use reactive messaging, but you can create a low-level Kafka client in the afterRestore, or create a lazy producer and not use it during the snapshot phase (so basically, initialize it during the first HTTP call)

cescoffier avatar Aug 19 '24 15:08 cescoffier

Hm yeah, but I still want to use this dependency...

hamburml avatar Aug 19 '24 17:08 hamburml

If it's only to produce, you can use the lazy feature (@ogunalp it should delay the initialization of the producer right?)

cescoffier avatar Aug 19 '24 17:08 cescoffier

Indeed, I forgot about the lazy-client flag. It should work for producers. And maybe even for consumers combined with pausable-channels, but I need to check.

ozangunalp avatar Aug 22 '24 09:08 ozangunalp

Thanks, I'll try it with lazy-client and come back to you.

hamburml avatar Aug 23 '24 06:08 hamburml

lazy-client worked! Thanks. Snapshot is created without a kafka connection, so there is no exception anymore.

hamburml avatar Sep 06 '24 10:09 hamburml

Sorry, closed it. @ozangunalp mentioned a test with pausable-channels.

hamburml avatar Sep 06 '24 10:09 hamburml

@hamburml is there a test repository that you can share? I suspect that Injecting a @Channel would work, because the subscription is lazy too. But consuming in an @Incoming channel would still create the client at startup.

ozangunalp avatar Sep 06 '24 11:09 ozangunalp

@ozangunalp not right now. I try to prepare one this evening. My service only writes into a @Channel and it works now without exceptions :) But I can not prepare it with working CRaC because I was not able to get a JDK which creates a snapshot working on my end. I let AWS Lambda do this for me.

hamburml avatar Sep 25 '24 14:09 hamburml