vertx-kafka-client
vertx-kafka-client copied to clipboard
NullPointerException when using consumer batchHandler
Version
4.5.7
Context
I saw there were exceptions thrown when using the kafka consumer with a batch handler.
Do you have a reproducer?
@Testcontainers
@ExtendWith(VertxExtension.class)
class KafkaBatchConsumerTest {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchConsumerTest.class);
private static final int MESSAGE_COUNT = 3;
@Container
private KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
@BeforeEach
void prepare(Vertx vertx, VertxTestContext testContext) {
Properties config = new Properties();
config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);
IntStream.range(0, MESSAGE_COUNT)
.mapToObj(index ->
KafkaProducerRecord.<String, String>create("test.topic", null, "Message " + index, 0))
.map(producer::write)
.collect(Collectors.collectingAndThen(Collectors.toList(), Future::all))
.onComplete(testContext.succeedingThenComplete());
}
@Test
void testBatchConsumer(Vertx vertx, VertxTestContext testContext) {
Properties config = new Properties();
config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test.group");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
consumer.batchHandler(kafkaRecords -> {
LOG.info("Got {} records", kafkaRecords.size());
IntStream.range(0, kafkaRecords.size()).mapToObj(kafkaRecords::recordAt).forEach(kafkaRecord -> LOG.info("Got record: {}", kafkaRecord));
testContext.verify(() -> {
Assertions.assertEquals(MESSAGE_COUNT, kafkaRecords.size());
testContext.completeNow();
});
});
consumer.subscribe("test.topic");
}
}
The test passes and consuming works but in the logs I can see NullPointerException
s, one for each message:
java.lang.NullPointerException: Cannot invoke "io.vertx.core.Handler.handle(Object)" because the return value of "io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.tracedHandler(io.vertx.core.Context, io.vertx.core.Handler)" is null
at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$run$10(KafkaReadStreamImpl.java:240)
at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:328)
at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:166)
at io.vertx.core.impl.ContextInternal.emit(ContextInternal.java:209)
at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.run(KafkaReadStreamImpl.java:240)
at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$schedule$8(KafkaReadStreamImpl.java:194)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:261)
at io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:59)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
Steps to reproduce
Create a kafka consumer with a batch handler and subscribe to a topic.