vertx-kafka-client icon indicating copy to clipboard operation
vertx-kafka-client copied to clipboard

Kafka client consumer handler() not receive message after exception

Open fangwohung opened this issue 4 years ago • 2 comments

I'm using vertx-kafka-client 3.9.1. When i meant to send a message ("abc") to consumer, i got an exception in handler function. After that, i tried to send another message ("xyz"), but my handler function didnt receive the message. Is it a bug ? It should have received a new message despite exception in handler function.

consumer.handler(record -> {
            consumer.rxCommit()
                    .subscribe(() -> logger.debug("Committed successfully!"),
                            throwable -> logger.error("Kafka commitment error", throwable));
            System.out.println(record.value());
            Integer.parseInt(record.value());
});

fangwohung avatar Jul 14 '20 07:07 fangwohung

I got the same issue with the Rxified version. I was using a Json Deserializer for consumer, when I produce a non-Json value, I got a SerializationException. Even though, I caught the exception, the next values are nowhere to find.

consumer
        .subscribe(kafkaTopic)
        .toObservable()
        .flatMap(someOerartor)
        .doOnError(
            error -> {
               // do some error Handling
              })
        .doOnError(error -> {})
        .subscribe(logger::info, logger::warn);

ngnhatdinh2 avatar Apr 14 '22 09:04 ngnhatdinh2

You may be facing the same problem as: https://github.com/vert-x3/vertx-kafka-client/issues/220

You have a "poison pill" in your topic, and in that case (be it with the standard Kafka client or the Vert.x Kafka client) you have head-of-the-line-blocking.

One strategy is, in the error handling function to check the class of the Exception, and if it's a RecordDeserializationException , get the topic, the partition and the offset, and manually commit it. This way, future polling made by the consumer will retrieve more records.

#220 has code samples (not Rx, but this is translatable) to do so.

Hope this helps.

aesteve avatar Mar 18 '23 20:03 aesteve