vertx-kafka-client icon indicating copy to clipboard operation
vertx-kafka-client copied to clipboard

Consumer went to infinite loop when SerializationException throws

Open cyhii opened this issue 2 years ago • 6 comments

Questions

I write a Kafka consumer to consume JSON messages, so I use JsonObjectDeserializer, my configuration is:

        val kafkaConfig: Map<String, String> = mapOf(
            "bootstrap.servers" to config.getString("kafka.bootstrap.servers"),
            "key.deserializer" to "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer" to "io.vertx.kafka.client.serialization.JsonObjectDeserializer",
            "group.id" to "my_group",
            "auto.offset.reset" to "latest",
            "enable.auto.commit" to "true",
        )

but sometimes when a Non-JSON message produced in Kafka, this consumer went to infinite loop at the position of that message.

I read the source code and found these lines in io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.java:

    private void pollRecords(Handler<ConsumerRecords<K, V>> handler) {
      if(this.polling.compareAndSet(false, true)){
          this.worker.submit(() -> {
             boolean submitted = false;
             try {
                if (!this.closed.get()) {
                  try {
                    ConsumerRecords<K, V> records = this.consumer.poll(pollTimeout);
                    if (records != null && records.count() > 0) {
                      submitted = true; // sets false only when the iterator is overwritten
                      this.context.runOnContext(v -> {
                          this.polling.set(false);
                          handler.handle(records);
                      });
                    }
                  } catch (WakeupException ignore) {
                  } catch (Exception e) {
                    if (exceptionHandler != null) {
                      exceptionHandler.handle(e);
                    }
                  }
                }
             } finally {
                 if(!submitted){
                     this.context.runOnContext(v -> {
                         this.polling.set(false);
                         schedule(0);
                     });
                 }
             }
          });
      }
  }

It throws SerializationException when call this.consumer.poll(), and then it calls exceptionHandler in the catch block. The bugged record is not skipped, so in the next time it causes SerializationException again. I think that's the infinite loop.

And, There is no enough messages in the exception, so I cannot do seek or some other actions in the exceptionHandler.

Maybe it should auto-skip the record and write some logs? or re-new a more make-sense Exception so caller can do something in the exceptionHandler?

Need help, thanks.

Version

4.2.6

cyhii avatar Jun 01 '22 16:06 cyhii

can you provide a reproducer ?

vietj avatar Jun 01 '22 17:06 vietj

can you provide a reproducer ?

Sure, I've made a commit to my demo project to reproduce this issue

Steps:

  1. Start a Kafka server on localhost(follow the quickstart)
  2. Start the application above
  3. Try to send some non-JSON message, it happens.
$ bin/kafka-console-producer.sh --topic my-topic --bootstrap-server 127.0.0.1:9092
>send a plaintext message

Uncomment the code in exceptionHandler to let the log show...

    consumer.exceptionHandler {
      // WARN: uncomment this line below then you will get log storm
      // log.warn("failed to consume message : {}", it.message, it)
    }

It will print tons of logs like this

12:14:16.099 [vert.x-kafka-consumer-thread-0] WARN  com.example.starter.MyKafkaConsumer - failed to consume message : Error deserializing key/value for partition my-topic-0 at offset 0. If needed, please seek past the record to continue consumption.
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition my-topic-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: io.vertx.core.json.DecodeException: Failed to decode:Unrecognized token 'send': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 6]
	at io.vertx.core.json.jackson.DatabindCodec.fromParser(DatabindCodec.java:129)
	at io.vertx.core.json.jackson.DatabindCodec.fromBuffer(DatabindCodec.java:99)
	at io.vertx.core.json.JsonObject.fromBuffer(JsonObject.java:948)
	at io.vertx.core.json.JsonObject.<init>(JsonObject.java:85)
	at io.vertx.core.buffer.impl.BufferImpl.toJsonObject(BufferImpl.java:110)
	at io.vertx.kafka.client.serialization.JsonObjectDeserializer.deserialize(JsonObjectDeserializer.java:39)
	at io.vertx.kafka.client.serialization.JsonObjectDeserializer.deserialize(JsonObjectDeserializer.java:28)
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
	at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$pollRecords$6(KafkaReadStreamImpl.java:154)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'send': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 6]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:717)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3588)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2683)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:865)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:757)
	at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4664)
	at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4484)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2730)
	at io.vertx.core.json.jackson.DatabindCodec.fromParser(DatabindCodec.java:126)
	... 22 common frames omitted

cyhii avatar Jun 02 '22 04:06 cyhii

@ppatierno can you have a look

vietj avatar Jun 08 '22 08:06 vietj

Not sure something I can take a look right now, @cyhii any chance for a contribution I can review and help with?

ppatierno avatar Jun 14 '22 07:06 ppatierno

What you have here @cyhii is what is called a "Poison Pill" a corrupted (or just invalid) record that is blocking consumption.

This would also happen with the default Kafka consumer.

I think what we are missing here an equivalent of: ErrorHandlingDeserializer from Spring or DeserializationExceptionHandler from Kafka Streams.

Meaning: a class, or callback, or any mechanism we could configure to appropriate behaviour:

  • by default: keep as-is: crash / loop forever
    • but change the exception type to get more information (for instance the records offset so that a user can seek)
  • in addition (later maybe?) provide different ExceptionHandler out of the box like LogAndContinueExceptionHandler from Kafka Streams or the DeadLetterPublishingRecoverer which are the most common strategy

aesteve avatar Mar 15 '23 07:03 aesteve

Hello again @cyhii .

I was trying to give this issue a try to see how to improve error management and started with a test.

It may take a while before having a more elaborated design (as in Spring) but if you want to workaround the issue and deal with the poison pill, you can do this: https://github.com/aesteve/vertx-kafka-client/blob/handle-serialization-exceptions/src/test/java/io/vertx/kafka/client/tests/ConsumerTestBase.java#L1378-L1396

So, correcting myself:

but change the exception type to get more information (for instance the records offset so that a user can seek)

This is actually not necessary, only a cast to RecordDeserializationException is required (after an instanceof check). And this is the way the standard Kafka Consumer works, too.

And, There is no enough messages in the exception, so I cannot do seek or some other actions in the exceptionHandler.

With such a cast, you should be able to do seek where needed 🙂

Still, having more elaborated Exception handlers could be interesting, but with this you'd deal with the poison pill the same way you would with the standard Kafka consumer client.

Hope this helps.

aesteve avatar Mar 18 '23 16:03 aesteve