vertx-kafka-client
vertx-kafka-client copied to clipboard
Kafka client consumer handler() not receive message after exception
I'm using vertx-kafka-client 3.9.1. When i meant to send a message ("abc") to consumer, i got an exception in handler function. After that, i tried to send another message ("xyz"), but my handler function didnt receive the message. Is it a bug ? It should have received a new message despite exception in handler function.
consumer.handler(record -> {
consumer.rxCommit()
.subscribe(() -> logger.debug("Committed successfully!"),
throwable -> logger.error("Kafka commitment error", throwable));
System.out.println(record.value());
Integer.parseInt(record.value());
});
I got the same issue with the Rxified version.
I was using a Json Deserializer for consumer
, when I produce a non-Json value, I got a SerializationException
. Even though, I caught the exception, the next values are nowhere to find.
consumer
.subscribe(kafkaTopic)
.toObservable()
.flatMap(someOerartor)
.doOnError(
error -> {
// do some error Handling
})
.doOnError(error -> {})
.subscribe(logger::info, logger::warn);
You may be facing the same problem as: https://github.com/vert-x3/vertx-kafka-client/issues/220
You have a "poison pill" in your topic, and in that case (be it with the standard Kafka client or the Vert.x Kafka client) you have head-of-the-line-blocking.
One strategy is, in the error handling function to check the class of the Exception, and if it's a RecordDeserializationException
, get the topic, the partition and the offset, and manually commit it. This way, future polling made by the consumer will retrieve more records.
#220 has code samples (not Rx, but this is translatable) to do so.
Hope this helps.