quarkus icon indicating copy to clipboard operation
quarkus copied to clipboard

Application exit on dns issues when using smallrye-reactive-messaging-kafka

Open davebarda opened this issue 3 years ago • 1 comments

Description

Hey, i'm running a quarkus application using platform version 2.10.2.Final, my application logic receive messages and then dispatches them to kafka producer. I'm having an issue that although message dispatch is optional in my application, the application fails to start due to some DNS issues we are having from time to time in the cluster.

MRE

This is the configuration

# Kafka general configs
kafka.bootstrap.servers=${SOME_DOMAIN_URI}
kafka.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

# SmallRye - Kafka
mp.messaging.outgoing.channel-name.connector=smallrye-kafka
mp.messaging.outgoing.channel-name.topic=topic-name

When i'm starting my application, sometime KafkaProducer creation fails and i'm receiving the following stacktrace

2022-08-09 15:40:21,225 ERROR [io.sma.rea.mes.provider] (main) SRMSG00230: Unable to create the publisher or subscriber during initialization: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:442)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:292)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:96)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:57)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSink.<init>(KafkaSink.java:88)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getSubscriberBuilder(KafkaConnector.java:237)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder$$superforward1(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:53)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:39)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:40)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getSubscriberBuilder(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConnectorFactories.lambda$wrap$5(ConnectorFactories.java:84)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createSubscriber(ConfiguredChannelFactory.java:189)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:144)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:105)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:200)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_7f54e4b27c1b49e5e062caa58f1e82797fa01393.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:323)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:305)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:130)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:99)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:103)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
        at com.app.AppMain.main(AppMain.kt:11)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at io.quarkus.bootstrap.runner.QuarkusEntryPoint.doRun(QuarkusEntryPoint.java:60)
        at io.quarkus.bootstrap.runner.QuarkusEntryPoint.main(QuarkusEntryPoint.java:31)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
        ... 46 more

2022-08-09 15:40:21,372 ERROR [io.qua.run.Application] (main) Failed to start application (with profile prod): org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
        at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:292)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:96)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:57)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSink.<init>(KafkaSink.java:88)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getSubscriberBuilder(KafkaConnector.java:237)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder$$superforward1(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:53)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:39)
        at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
        at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
        at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:40)
        at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder(Unknown Source)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getSubscriberBuilder(Unknown Source)
        at io.smallrye.reactive.messaging.providers.impl.ConnectorFactories.lambda$wrap$5(ConnectorFactories.java:84)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createSubscriber(ConfiguredChannelFactory.java:189)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:144)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:105)
        at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
        at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:200)
        at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_7f54e4b27c1b49e5e062caa58f1e82797fa01393.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:323)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:305)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:130)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:99)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:103)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
        at com.app.AppMain.main(AppMain.kt:11)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at io.quarkus.bootstrap.runner.QuarkusEntryPoint.doRun(QuarkusEntryPoint.java:60)
        at io.quarkus.bootstrap.runner.QuarkusEntryPoint.main(QuarkusEntryPoint.java:31)

it happens because we're having issue to resolve ip from ${SOME_DOMAIN_URI} As kafka is not a mandatory part of our application, I would prefer to see these errors while dispatching the events, I would want the entire application to crash in such cases. I do understand why general KafkaProducer configurations errors should crash the application (for example bad port was supplied), but I'd expect some configuration to make it more fault tolerant regarding DNS resolving.

Implementation ideas

Swallow and log config exceptions at https://github.com/quarkusio/quarkus/blob/main/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java#L41 or at least supply some configuration to do so.

davebarda avatar Aug 09 '22 13:08 davebarda

/cc @Ladicek, @alesj, @cescoffier, @jmartisk, @michalszynkiewicz, @ozangunalp, @phillip-kruger, @radcortez

quarkus-bot[bot] avatar Aug 09 '22 13:08 quarkus-bot[bot]

Hello @davebarda,

Were you able to find some solution or workarounds? Because even we are facing similar issue while using the Kafka with smallrye. Any suggestion would be really helpful.

Aravinda93 avatar Aug 16 '22 08:08 Aravinda93

Hey @Aravinda93 , we did manage to make some hack by creating a listener which comes before the messagelifecycle and verifies that the DNS does work, it doesn't ensure that the DNS will work for 100%, but it will be enough for almost every cases. Something like that:

fun onApplicationStart(@Observes @Priority(Interceptor.Priority.PLATFORM_BEFORE) event: StartupEvent) {
    assert(kafkaServers.isNotEmpty()) { "kafka.bootstrap.servers is not set" }

    while (noneDnsResolveable()) {
        logger.error("Failed to resolve kafka address, retrying in $dnsResolveTimeoutSeconds seconds," +
                " kafkaServers: $kafkaServers")

        Thread.sleep(Duration.ofSeconds(dnsResolveTimeoutSeconds).toMillis())

        if (ApplicationLifecycleManager.isVmShuttingDown()) {
            break
        }
    }
}

private fun noneDnsResolveable(): Boolean {
    return kafkaServers.map { kafkaServerStr ->
        val kafkaServerSplit = kafkaServerStr.split(":")
        val host = kafkaServerSplit[0]
        val port = kafkaServerSplit[1].toInt()
        InetSocketAddress(host, port).isUnresolved
    }.all { it }
}

Note that the priority is greater than the one which is being set by the smallrye lifecycle.

davebarda avatar Aug 16 '22 10:08 davebarda

this is tricky, as it will eventually mark the application unhealthy (as the kafka connection could not be established). There is no concept of optionality, and I don't think it should be added.

However, you can disable the channel using the "enabled=fase" attribute.

cescoffier avatar Aug 17 '22 09:08 cescoffier

There is a difference between an unhealthy app to crashing the app on startup. I'll also give 2 proofs why it should be added:

  1. In case that the connection is unavailable (not because of DNS), the app is not crashing, just logging connection errors - which is a different behavior
  2. The health of the connection can be controlled using
mp.messaging.incoming.your-channel.health-readiness-enabled=false
mp.messaging.outgoing.your-channel.health-readiness-enabled=false

the same should be for any type of connection issue pops out on connector creation.

davebarda avatar Aug 17 '22 10:08 davebarda

On startup the application is going to retry to connect (I don't have the number of attempts, but it's a huge amount if I remember correctly). However, a non-resolvable DNS name may shortcut the retry loop.

cescoffier avatar Aug 18 '22 13:08 cescoffier

Exactly, this is why I believe it should be solved, as you can see i'm not the only one who is facing this problem. Regarding the suggestion to disable the channel, is it possible to enable it dynamically in the application?

davebarda avatar Aug 18 '22 13:08 davebarda

No, you can't disable channels dynamically. Channels are wired at build time.

cescoffier avatar Aug 18 '22 14:08 cescoffier

The Apache Kafka client throws a ConfigException, considered not recoverable by the client. That's why there is no retry. We cannot do much.

You can introduce a profile in your application disabling the channel.

cescoffier avatar Aug 18 '22 14:08 cescoffier

I know that that's what the client is throwing, it doesn't mean that ConfigException of Kafka should cause an application crash of the Quarkus application. I don't see a way profile helps with that specific issue, afterwards I can't know in build time if there will be a DNS issue or not.

davebarda avatar Aug 18 '22 14:08 davebarda

Quarkus requires that the Kafka producer/consumer are initialized correctly. That’s a requirement I reactive messaging.

In your case, you may want to use the bare Kafka producer directly (be aware of the threading model, buffering, and acks), and handle the exception.

cescoffier avatar Aug 18 '22 17:08 cescoffier

@ozangunalp WDYT?

cescoffier avatar Aug 29 '22 06:08 cescoffier

I don't think that swallowing the Kafka client ConfigException or reactive messaging deployment exception is a good idea. I guess it would be possible to initialize producer/consumer clients lazily for reactive messaging channels, but it is a fairly deep refactoring.

ozangunalp avatar Aug 29 '22 06:08 ozangunalp

@ozangunalp I do agree that lazy initialization is a better solution, as its cleaner and might also cover some other issues might pop up, anyway I do think that being opinionated about this (IMHO - non intuitive) behavior is a quick win here.

davebarda avatar Aug 29 '22 08:08 davebarda

@davebarda ok I've drafted something, will reference you to a PR later today

ozangunalp avatar Aug 29 '22 10:08 ozangunalp

@ozangunalp laziness would make the health checks and metrics a bit more complex. How do you know if you are ready if you don't know if we can connect?

cescoffier avatar Aug 29 '22 13:08 cescoffier

On the consumer side, we already ignore the health check if there are no subscribers. But on the producer side indeed there would be a change of behaviour. I was thinking of forcing the initialization behind a flag.

What I find challenging is the channel closing sequence.

ozangunalp avatar Aug 29 '22 14:08 ozangunalp

Lazy-initialized Kafka clients are merged on #29176 and can be enabled by the config flag lazy-client=true on incoming or outgoing channels.

ozangunalp avatar Nov 21 '22 09:11 ozangunalp

Closing this one.

ozangunalp avatar Nov 21 '22 09:11 ozangunalp