vertx-kafka-client
vertx-kafka-client copied to clipboard
NullPointerException if kafka producer has partitions that have no leader
If kafka is in a unhealthy state and has partitions that currently don't have a leader, it will make the kafkaproducer crash with a NullPointerException. I believe those partitions should be ignored in that case.
Stacktrace that happens in that case:
ERROR i.v.c.i.ContextImpl - Unhandled exception {} java.lang.NullPointerException: null at io.vertx.kafka.client.common.impl.Helper.from(Helper.java:87) ~[vertx-kafka-client-3.5.2.jar:3.5.2] at io.vertx.kafka.client.producer.impl.KafkaProducerImpl.lambda$partitionsFor$7(KafkaProducerImpl.java:165) ~[vertx-kafka-client-3.5.2.jar:3.5.2] at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:79) ~[vertx-core-3.5.3.jar:3.5.3] at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:289) ~[vertx-core-3.5.3.jar:3.5.3] at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339) ~[vertx-core-3.5.3.jar:3.5.3] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-common-4.1.19.Final.jar:4.1.19.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-common-4.1.19.Final.jar:4.1.19.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) [netty-transport-4.1.19.Final.jar:4.1.19.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.19.Final.jar:4.1.19.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.19.Final.jar:4.1.19.Final]
Thanks for reporting, I'll look into this.
@ppatierno Following this post, I think we should have checks in the loop
for (org.apache.kafka.common.PartitionInfo kafkaPartitionInfo: done.result()) {
PartitionInfo partitionInfo = new PartitionInfo();
partitionInfo
.setInSyncReplicas(
Stream.of(kafkaPartitionInfo.inSyncReplicas()).map(Helper::from).collect(Collectors.toList()))
.setLeader(Helper.from(kafkaPartitionInfo.leader())) // -> WE MAY NEED TO CHECK IF THIS VALUE IS NULL
.setPartition(kafkaPartitionInfo.partition())
.setReplicas(
Stream.of(kafkaPartitionInfo.replicas()).map(Helper::from).collect(Collectors.toList()))
.setTopic(kafkaPartitionInfo.topic());
partitions.add(partitionInfo);
}
handler.handle(Future.succeededFuture(partitions));
} else {
handler.handle(Future.failedFuture(done.cause()));
}
We also need to fix that logic in KafkaConsumerImpl
.
Should I make a PR with the approach above?
For my case done.result()
throws NullPointerException when I call rxPartitions for a non-existing topic with auto.create.topics.enable=false
I think it should be caught and Future.failedFuture(ex.getCause()) should be called with the error.
Hi, I'm having the same issue and I also got to the conclusion that is the done.result()
that returns null
and therefore the loop throws a java.lang.NullPointerException
and the handler
never get notified of this error.
My traceback:
2022-02-15 14:14:11 ERROR [ContextImpl:] Unhandled exception
java.lang.NullPointerException: null
at io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl.lambda$partitionsFor$8(KafkaConsumerImpl.java:466) ~[vertx-kafka-client-4.2.4.jar:4.2.4]
at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$null$1(KafkaReadStreamImpl.java:130) ~[vertx-kafka-client-4.2.4.jar:4.2.4]
at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100) ~[vertx-core-4.2.4.jar:4.2.4]
at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:63) ~[vertx-core-4.2.4.jar:4.2.4]
at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:38) ~[vertx-core-4.2.4.jar:4.2.4]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [netty-common-4.1.73.Final.jar:4.1.73.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) [netty-common-4.1.73.Final.jar:4.1.73.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503) [netty-transport-4.1.73.Final.jar:4.1.73.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [netty-common-4.1.73.Final.jar:4.1.73.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.73.Final.jar:4.1.73.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.73.Final.jar:4.1.73.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
ping @ppatierno
I have to find the time to finally take a look at this issue :-(
Has anyone managed to reproduce this issue?
I tried a few things like producer.write(...)
in a topic that does not exist together with auto.create.topics.enable
set to false. All I get is a failed future (therefore the .result()
is null, but that's expected) but no crash unfortunately.
I understand the fix proposed by @anhldbk (and it looks like the right one) and we could implement it safely but I'd like to reproduce the issue first.
In fact, if the infamous "metadata is not present after ..." error happens, it seems to be handled well.
And I can't find a way to have the Kafka standard client return a null List. It seems it'll either return an empty List or throw an exception, even if the cluster is in an unhealthy state.
Which lets me think that yes, we potentially could add a band-aid to check for nullity, but the problem might be buried a bit deeper.