vertx-kafka-client icon indicating copy to clipboard operation
vertx-kafka-client copied to clipboard

NullPointerException if kafka producer has partitions that have no leader

Open mouse256 opened this issue 5 years ago • 7 comments

If kafka is in a unhealthy state and has partitions that currently don't have a leader, it will make the kafkaproducer crash with a NullPointerException. I believe those partitions should be ignored in that case.

Stacktrace that happens in that case: ERROR i.v.c.i.ContextImpl - Unhandled exception {} java.lang.NullPointerException: null at io.vertx.kafka.client.common.impl.Helper.from(Helper.java:87) ~[vertx-kafka-client-3.5.2.jar:3.5.2] at io.vertx.kafka.client.producer.impl.KafkaProducerImpl.lambda$partitionsFor$7(KafkaProducerImpl.java:165) ~[vertx-kafka-client-3.5.2.jar:3.5.2] at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:79) ~[vertx-core-3.5.3.jar:3.5.3] at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:289) ~[vertx-core-3.5.3.jar:3.5.3] at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339) ~[vertx-core-3.5.3.jar:3.5.3] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-common-4.1.19.Final.jar:4.1.19.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-common-4.1.19.Final.jar:4.1.19.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) [netty-transport-4.1.19.Final.jar:4.1.19.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.19.Final.jar:4.1.19.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.19.Final.jar:4.1.19.Final]

mouse256 avatar Oct 26 '18 11:10 mouse256

Thanks for reporting, I'll look into this.

ppatierno avatar Oct 26 '18 12:10 ppatierno

@ppatierno Following this post, I think we should have checks in the loop

for (org.apache.kafka.common.PartitionInfo kafkaPartitionInfo: done.result()) {

	PartitionInfo partitionInfo = new PartitionInfo();

	partitionInfo
	.setInSyncReplicas(
		Stream.of(kafkaPartitionInfo.inSyncReplicas()).map(Helper::from).collect(Collectors.toList()))
	.setLeader(Helper.from(kafkaPartitionInfo.leader())) // -> WE MAY NEED TO CHECK IF THIS VALUE IS NULL
	.setPartition(kafkaPartitionInfo.partition())
	.setReplicas(
		Stream.of(kafkaPartitionInfo.replicas()).map(Helper::from).collect(Collectors.toList()))
	.setTopic(kafkaPartitionInfo.topic());

	partitions.add(partitionInfo);
}
	handler.handle(Future.succeededFuture(partitions));
} else {
	handler.handle(Future.failedFuture(done.cause()));
}

We also need to fix that logic in KafkaConsumerImpl.

Should I make a PR with the approach above?

anhldbk avatar Nov 19 '18 16:11 anhldbk

For my case done.result() throws NullPointerException when I call rxPartitions for a non-existing topic with auto.create.topics.enable=false

I think it should be caught and Future.failedFuture(ex.getCause()) should be called with the error.

barbarosalp avatar Feb 14 '19 21:02 barbarosalp

Hi, I'm having the same issue and I also got to the conclusion that is the done.result() that returns null and therefore the loop throws a java.lang.NullPointerException and the handler never get notified of this error.

My traceback:

2022-02-15 14:14:11 ERROR [ContextImpl:] Unhandled exception
java.lang.NullPointerException: null
	at io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl.lambda$partitionsFor$8(KafkaConsumerImpl.java:466) ~[vertx-kafka-client-4.2.4.jar:4.2.4]
	at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$null$1(KafkaReadStreamImpl.java:130) ~[vertx-kafka-client-4.2.4.jar:4.2.4]
	at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100) ~[vertx-core-4.2.4.jar:4.2.4]
	at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:63) ~[vertx-core-4.2.4.jar:4.2.4]
	at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:38) ~[vertx-core-4.2.4.jar:4.2.4]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) [netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503) [netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.73.Final.jar:4.1.73.Final]
	at java.lang.Thread.run(Thread.java:829) [?:?]

b1zzu avatar Feb 16 '22 10:02 b1zzu

ping @ppatierno

vietj avatar Feb 16 '22 11:02 vietj

I have to find the time to finally take a look at this issue :-(

ppatierno avatar Feb 17 '22 09:02 ppatierno

Has anyone managed to reproduce this issue?

I tried a few things like producer.write(...) in a topic that does not exist together with auto.create.topics.enable set to false. All I get is a failed future (therefore the .result() is null, but that's expected) but no crash unfortunately.

I understand the fix proposed by @anhldbk (and it looks like the right one) and we could implement it safely but I'd like to reproduce the issue first.

In fact, if the infamous "metadata is not present after ..." error happens, it seems to be handled well.

And I can't find a way to have the Kafka standard client return a null List. It seems it'll either return an empty List or throw an exception, even if the cluster is in an unhealthy state.

Which lets me think that yes, we potentially could add a band-aid to check for nullity, but the problem might be buried a bit deeper.

aesteve avatar Mar 18 '23 19:03 aesteve