quarkus
quarkus copied to clipboard
Application exit on dns issues when using smallrye-reactive-messaging-kafka
Description
Hey, i'm running a quarkus application using platform version 2.10.2.Final,
my application logic receive messages and then dispatches them to kafka producer.
I'm having an issue that although message dispatch is optional in my application, the application fails to start due to some DNS issues we are having from time to time in the cluster.
MRE
This is the configuration
# Kafka general configs
kafka.bootstrap.servers=${SOME_DOMAIN_URI}
kafka.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# SmallRye - Kafka
mp.messaging.outgoing.channel-name.connector=smallrye-kafka
mp.messaging.outgoing.channel-name.topic=topic-name
When i'm starting my application, sometime KafkaProducer creation fails and i'm receiving the following stacktrace
2022-08-09 15:40:21,225 ERROR [io.sma.rea.mes.provider] (main) SRMSG00230: Unable to create the publisher or subscriber during initialization: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:442)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:292)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:96)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:57)
at io.smallrye.reactive.messaging.kafka.impl.KafkaSink.<init>(KafkaSink.java:88)
at io.smallrye.reactive.messaging.kafka.KafkaConnector.getSubscriberBuilder(KafkaConnector.java:237)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder$$superforward1(Unknown Source)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:53)
at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:39)
at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:40)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder(Unknown Source)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getSubscriberBuilder(Unknown Source)
at io.smallrye.reactive.messaging.providers.impl.ConnectorFactories.lambda$wrap$5(ConnectorFactories.java:84)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createSubscriber(ConfiguredChannelFactory.java:189)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:144)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:105)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:200)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_7f54e4b27c1b49e5e062caa58f1e82797fa01393.notify(Unknown Source)
at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:323)
at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:305)
at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:130)
at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:99)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
at io.quarkus.runtime.Application.start(Application.java:101)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:103)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
at com.app.AppMain.main(AppMain.kt:11)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at io.quarkus.bootstrap.runner.QuarkusEntryPoint.doRun(QuarkusEntryPoint.java:60)
at io.quarkus.bootstrap.runner.QuarkusEntryPoint.main(QuarkusEntryPoint.java:31)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
... 46 more
2022-08-09 15:40:21,372 ERROR [io.qua.run.Application] (main) Failed to start application (with profile prod): org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:292)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:96)
at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaProducer.<init>(ReactiveKafkaProducer.java:57)
at io.smallrye.reactive.messaging.kafka.impl.KafkaSink.<init>(KafkaSink.java:88)
at io.smallrye.reactive.messaging.kafka.KafkaConnector.getSubscriberBuilder(KafkaConnector.java:237)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder$$superforward1(Unknown Source)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass$$function$$9.apply(Unknown Source)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.proceed(AroundInvokeInvocationContext.java:53)
at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor.intercept(DuplicatedContextConnectorFactoryInterceptor.java:39)
at io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor_Bean.intercept(Unknown Source)
at io.quarkus.arc.impl.InterceptorInvocation.invoke(InterceptorInvocation.java:41)
at io.quarkus.arc.impl.AroundInvokeInvocationContext.perform(AroundInvokeInvocationContext.java:40)
at io.quarkus.arc.impl.InvocationContexts.performAroundInvoke(InvocationContexts.java:32)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_Subclass.getSubscriberBuilder(Unknown Source)
at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getSubscriberBuilder(Unknown Source)
at io.smallrye.reactive.messaging.providers.impl.ConnectorFactories.lambda$wrap$5(ConnectorFactories.java:84)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.createSubscriber(ConfiguredChannelFactory.java:189)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:144)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:105)
at io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory_ClientProxy.initialize(Unknown Source)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:200)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager_ClientProxy.start(Unknown Source)
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_7f54e4b27c1b49e5e062caa58f1e82797fa01393.notify(Unknown Source)
at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:323)
at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:305)
at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:73)
at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:130)
at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:99)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
at io.quarkus.runtime.Application.start(Application.java:101)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:103)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:67)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:41)
at com.app.AppMain.main(AppMain.kt:11)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at io.quarkus.bootstrap.runner.QuarkusEntryPoint.doRun(QuarkusEntryPoint.java:60)
at io.quarkus.bootstrap.runner.QuarkusEntryPoint.main(QuarkusEntryPoint.java:31)
it happens because we're having issue to resolve ip from ${SOME_DOMAIN_URI}
As kafka is not a mandatory part of our application, I would prefer to see these errors while dispatching the events, I would want the entire application to crash in such cases. I do understand why general KafkaProducer configurations errors should crash the application (for example bad port was supplied), but I'd expect some configuration to make it more fault tolerant regarding DNS resolving.
Implementation ideas
Swallow and log config exceptions at https://github.com/quarkusio/quarkus/blob/main/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/SmallRyeReactiveMessagingLifecycle.java#L41 or at least supply some configuration to do so.
/cc @Ladicek, @alesj, @cescoffier, @jmartisk, @michalszynkiewicz, @ozangunalp, @phillip-kruger, @radcortez
Hello @davebarda,
Were you able to find some solution or workarounds? Because even we are facing similar issue while using the Kafka with smallrye. Any suggestion would be really helpful.
Hey @Aravinda93 , we did manage to make some hack by creating a listener which comes before the messagelifecycle and verifies that the DNS does work, it doesn't ensure that the DNS will work for 100%, but it will be enough for almost every cases. Something like that:
fun onApplicationStart(@Observes @Priority(Interceptor.Priority.PLATFORM_BEFORE) event: StartupEvent) {
assert(kafkaServers.isNotEmpty()) { "kafka.bootstrap.servers is not set" }
while (noneDnsResolveable()) {
logger.error("Failed to resolve kafka address, retrying in $dnsResolveTimeoutSeconds seconds," +
" kafkaServers: $kafkaServers")
Thread.sleep(Duration.ofSeconds(dnsResolveTimeoutSeconds).toMillis())
if (ApplicationLifecycleManager.isVmShuttingDown()) {
break
}
}
}
private fun noneDnsResolveable(): Boolean {
return kafkaServers.map { kafkaServerStr ->
val kafkaServerSplit = kafkaServerStr.split(":")
val host = kafkaServerSplit[0]
val port = kafkaServerSplit[1].toInt()
InetSocketAddress(host, port).isUnresolved
}.all { it }
}
Note that the priority is greater than the one which is being set by the smallrye lifecycle.
this is tricky, as it will eventually mark the application unhealthy (as the kafka connection could not be established). There is no concept of optionality, and I don't think it should be added.
However, you can disable the channel using the "enabled=fase" attribute.
There is a difference between an unhealthy app to crashing the app on startup. I'll also give 2 proofs why it should be added:
- In case that the connection is unavailable (not because of DNS), the app is not crashing, just logging connection errors - which is a different behavior
- The health of the connection can be controlled using
mp.messaging.incoming.your-channel.health-readiness-enabled=false
mp.messaging.outgoing.your-channel.health-readiness-enabled=false
the same should be for any type of connection issue pops out on connector creation.
On startup the application is going to retry to connect (I don't have the number of attempts, but it's a huge amount if I remember correctly). However, a non-resolvable DNS name may shortcut the retry loop.
Exactly, this is why I believe it should be solved, as you can see i'm not the only one who is facing this problem. Regarding the suggestion to disable the channel, is it possible to enable it dynamically in the application?
No, you can't disable channels dynamically. Channels are wired at build time.
The Apache Kafka client throws a ConfigException, considered not recoverable by the client. That's why there is no retry. We cannot do much.
You can introduce a profile in your application disabling the channel.
I know that that's what the client is throwing, it doesn't mean that ConfigException of Kafka should cause an application crash of the Quarkus application. I don't see a way profile helps with that specific issue, afterwards I can't know in build time if there will be a DNS issue or not.
Quarkus requires that the Kafka producer/consumer are initialized correctly. That’s a requirement I reactive messaging.
In your case, you may want to use the bare Kafka producer directly (be aware of the threading model, buffering, and acks), and handle the exception.
@ozangunalp WDYT?
I don't think that swallowing the Kafka client ConfigException or reactive messaging deployment exception is a good idea.
I guess it would be possible to initialize producer/consumer clients lazily for reactive messaging channels, but it is a fairly deep refactoring.
@ozangunalp I do agree that lazy initialization is a better solution, as its cleaner and might also cover some other issues might pop up, anyway I do think that being opinionated about this (IMHO - non intuitive) behavior is a quick win here.
@davebarda ok I've drafted something, will reference you to a PR later today
@ozangunalp laziness would make the health checks and metrics a bit more complex. How do you know if you are ready if you don't know if we can connect?
On the consumer side, we already ignore the health check if there are no subscribers. But on the producer side indeed there would be a change of behaviour. I was thinking of forcing the initialization behind a flag.
What I find challenging is the channel closing sequence.
Lazy-initialized Kafka clients are merged on #29176 and can be enabled by the config flag lazy-client=true on incoming or outgoing channels.
Closing this one.