vertx-kafka-client icon indicating copy to clipboard operation
vertx-kafka-client copied to clipboard

NullPointerException when using consumer batchHandler

Open afloarea opened this issue 10 months ago • 0 comments

Version

4.5.7

Context

I saw there were exceptions thrown when using the kafka consumer with a batch handler.

Do you have a reproducer?


@Testcontainers
@ExtendWith(VertxExtension.class)
class KafkaBatchConsumerTest {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchConsumerTest.class);
    private static final int MESSAGE_COUNT = 3;

    @Container
    private KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

    @BeforeEach
    void prepare(Vertx vertx, VertxTestContext testContext) {
        Properties config = new Properties();
        config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);

        IntStream.range(0, MESSAGE_COUNT)
                .mapToObj(index ->
                        KafkaProducerRecord.<String, String>create("test.topic", null, "Message " + index, 0))
                .map(producer::write)
                .collect(Collectors.collectingAndThen(Collectors.toList(), Future::all))
                .onComplete(testContext.succeedingThenComplete());
    }

    @Test
    void testBatchConsumer(Vertx vertx, VertxTestContext testContext) {

        Properties config = new Properties();
        config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "test.group");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

        consumer.batchHandler(kafkaRecords -> {
            LOG.info("Got {} records", kafkaRecords.size());
            IntStream.range(0, kafkaRecords.size()).mapToObj(kafkaRecords::recordAt).forEach(kafkaRecord -> LOG.info("Got record: {}", kafkaRecord));
            testContext.verify(() -> {
                Assertions.assertEquals(MESSAGE_COUNT, kafkaRecords.size());
                testContext.completeNow();
            });
        });

        consumer.subscribe("test.topic");

    }

}

The test passes and consuming works but in the logs I can see NullPointerExceptions, one for each message:

java.lang.NullPointerException: Cannot invoke "io.vertx.core.Handler.handle(Object)" because the return value of "io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.tracedHandler(io.vertx.core.Context, io.vertx.core.Handler)" is null
	at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$run$10(KafkaReadStreamImpl.java:240)
	at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:328)
	at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:166)
	at io.vertx.core.impl.ContextInternal.emit(ContextInternal.java:209)
	at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.run(KafkaReadStreamImpl.java:240)
	at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$schedule$8(KafkaReadStreamImpl.java:194)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:261)
	at io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:59)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Steps to reproduce

Create a kafka consumer with a batch handler and subscribe to a topic.

afloarea avatar Apr 05 '24 18:04 afloarea