micronaut-kafka
micronaut-kafka copied to clipboard
Missing consumerRecord in KafkaListenerExceptionHandler
To have a better chance to recover from errors, i'd like to LOG errors in a KafkaListenerExceptionHandler and no matter what was the cause of that exception, seek past the corrupted records.
Unfortunately when implementing a custom KafkaListenerExceptionHandler the KafkaListenerExceptions i get do not contain a consumerRecord where i could get topic, partition and offset from.
The documentation says:
The KafkaListenerExceptionHandler receives an exception of type KafkaListenerException which allows access to the original ConsumerRecord, if available.
But what are the cases where the consumer record is available and what are the cases where it is not?
I am using batch processing to batch my sql statements. Could that be the reason?
@ckosmowski, if you take a look at KafkaConsumerProcessor, assuming you are using @KafkaListener, you can see that there are a few locations where the exception handler is invoked without the ConsumerRecord:
Line 547: https://github.com/micronaut-projects/micronaut-kafka/blob/67a8ea172bae75847e133988bb10696c6512d322/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java#L543-L551
Line 558: https://github.com/micronaut-projects/micronaut-kafka/blob/67a8ea172bae75847e133988bb10696c6512d322/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java#L555-L559
These scenarios might be more on the unlikely side.
I believe there is also the case when deserialization fails
Okay thanks for the info. In my case the deserialization fails, it would have been good to have the record to move it to some other topic for later manual inspection.
What kind of deserializer are you using? In some cases, you can get the message bytes in case of deserialization failure. If not supported out of the box, you can create one which then throws an exception with the failed message bytes.
We stayed with the default which should be:
https://github.com/micronaut-projects/micronaut-kafka/blob/master/kafka/src/main/java/io/micronaut/configuration/kafka/serde/JsonSerde.java
I think the following could be a problem for us:
https://github.com/micronaut-projects/micronaut-kafka/blob/67a8ea172bae75847e133988bb10696c6512d322/kafka/src/main/java/io/micronaut/configuration/kafka/serde/JsonSerde.java#L78-L83
so let me start over. Actually we'd like to use the seekPastDeserializationError feature of the DefaultKafkaListenerException handler. This handler however relies on the exception thrown beeing a SerializationException.
In the above code snippet there are a lot of possibilities for the Exception raised from the deserialize method beeing a different Exception. i.E. if the Json Data contains a String value that is to be parsed into a date and that string value has the wrong format an IllegalArgumentException or an DateTimeFormat exception will be raised depending on the target type that is used.
So before the "orElseThrow" method could kick in for a missing optional, the deserialize method could have throwan any type of other exception.
Perhaps an enhancement can be made to construct the SerializationException with the message bytes. That way the exception handler can do what it wants with the bytes to determine what went wrong.
Our problem here is that the call to objectSerializer.deserialize(data, type) throws Exceptions different to Serialization Exception and that these Exceptions are not handled with the seekPastDeserializationError feature.
What exception is being thrown...? According to:
https://github.com/micronaut-projects/micronaut-core/blob/f042f80f31a4218ff44d3a457737fbd523a0154d/runtime/src/main/java/io/micronaut/jackson/serialize/JacksonObjectSerializer.java#L65-L81
Any IOExceptions which occur, get wrapped in a SerializationException...
Yes that's correct. But there is IllegalArgumentException and/or DateTimeParseException for example when a String is beeing converted to a Date by the objectSerializer. And i think there can be more like NumberFormatException and so on. Everything that can happen when a String representation of a JSON Object is deserialized.
Ahh, I see, so what would best benefit you? To have a global exception catch there?
A global Exception catch and throwing a SerializationException would probably do the trick. Since if i am correct it would lead to the seekPastDeserializationError feature to do its workd and jump over the corrupted records (i think that's the spot where the consumerRecord information also gets important). If that's the case, that would be the best fit (at least for us).
By the way: Thank's for all the fast response and the effort you people put in the project.
@graemerocher is that just what the doctor ordered in this case? If so, happy to open a PR...
Also, what do you think about contextual byte[] data information being added to the SerializationException? Perhaps a KafkaSerializationException which has a cause of SerializationException and also the byte[] data alongside it?
So is there anything that can be done with this issue? It is unclear to me
@graemerocher, I am only seeing a potential for a small enhancement to include the bytes of the message which failed deserialization in the exception, or perhaps to include some topic partition information, since there is no consumer record available. Can we create the consumer record with type byte[]?
I don't think that my real problem is clear here. Please see https://github.com/micronaut-projects/micronaut-kafka/issues/110#issuecomment-603240398
The call to objectSerializer.deserialize(data, type) can raise any exception type other than serializationException which breaks the seekPastDeserializationError setting even if there was an error while deserializing the message i.e. IllegalArgumentExceptions raised by jackson.
A potential solution would be to catch all exceptions around objectSerializer.deserialize(data, type) and rethrow a SerializationEception OR to just allow the seekPastDeserializationError mechnism to work for all raised exceptions.
And that means if jackson raises a Error during deserialization there is no way to jump over the poisoned records.
Is this part clear to you? (I fear that my english skills are to weak so that i explain a simple thing in a complicated way).
Hi guys! I guess this occurs for every batch consumer. As micronaut sends the whole batch to the user implementation, it has no way of knowing which record caused the exception (that will be handled by the exception handler).
I guess one option is to send a collection of ConsumerRecord to the exception handler, instead of sending an empty optional. Naturally, this would have to be another interface/method.
In my case, I want to know the earliest offset for each partition in the batch, which is currently impossible AFAIK.
Any thoughts? @ckosmowski @graemerocher @brianwyka
Hi, please see https://github.com/micronaut-projects/micronaut-kafka/issues/110#issuecomment-613487454 the missing ConsumerRecord isn't the real problem for me here. It is the fact that Exceptions occuring in deserialize() are not catched and rethrown as SerializationExceptions.
@graemerocher, I am only seeing a potential for a small enhancement to include the bytes of the message which failed deserialization in the exception, or perhaps to include some topic partition information, since there is no consumer record available. Can we create the consumer record with type
byte[]?
Definitely agree, complicates creation of DLQ when handling non-parsable / unexpected messages (schema registry goes down, etc)
I recently ran into this also trying to do a DLQ with Micronaut Kafka.
I had to fiddle with the code really hard in order to accomplish two things:
- wanted to be able to tell the KafkaListenerExceptionHandler whether it was a key deserialization that didn't work or a value deserialization
- Wanted to put the message on the DLQ exactly as it came in the original topic
Here's my strategy I used:
Created a new exception class that held the topic, byte array and a boolean. I also optionally had this class accept the data deserialized as a Map, ?>.
class RawDataDeserializationException(
cause: Throwable?,
val topic: String,
val data: ByteArray,
val key: Boolean,
dataAsMap: Map<*, *>? = null
) : RuntimeException("Unable to deserialize ${if (key) "key" else "value" }${if (cause != null) ": ${cause.message}" else "" }.", cause) {
val dataAsString = String(data)
val serializedData = dataAsMap ?: dataAsString
}
Then I replaced the CompositeSerdeRegistry like the docs suggest, delegating to the existing JsonSerdeRegistry bean.
@Replaces(CompositeSerdeRegistry::class)
@Singleton
@Primary
class KafkaSerdeRegistry(private val jsonSerdeRegistry: JsonSerdeRegistry) :
SerdeRegistry by jsonSerdeRegistry {
override fun <T : Any?> getDeserializer(type: Class<T>?): Deserializer<T> {
return ExceptionHandlingKafkaDeserializer(getSerde(type), getSerde(Map::class.java))
}
}
Then I wrote a Deserializer that delegates and catches exceptions, passing in the byte arrays
class ExceptionHandlingKafkaDeserializer<T>(
private val serde: Serde<T>,
private val mapSerde: Serde<Map<*, *>>
) : Deserializer<T> {
private var key: Boolean = false
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
key = isKey
}
override fun deserialize(topic: String, data: ByteArray): T {
try {
return serde.deserializer().deserialize(topic, data)
} catch (ex: Exception) {
val map =
try {
mapSerde.deserializer().deserialize(topic, data)
} catch (ex: Exception) {
throw RawDataDeserializationException(ex, topic, data, key)
}
throw RawDataDeserializationException(ex, topic, data, key, map)
}
}
}
The biggest problem I ran into with this though is that the only thing that calls Deserializer#configure is the Kafka library itself after it initializes the serializer. This option wasn't possible since I wanted Micronaut's BeanContext to inject my relevant Serde's in.
The KafkaConsumerProcessor code that configures the Serializers before passing it to Kafka doesn't call configure, and there's no way using Micronaut's BeanContext to override the DefaultKafkaConsumerConfiguration parameter because it gets instantiated as a POJO here, and that constructor ignores the keyDeserializer and valueDeserializers set on the consumerConfigurationDefaults which is built and overridable from the context.
That left me with only one option - to "monkey-patch" the library code, so I created a class in the io.micronaut.configuration.kafka.config package called DefaultKafkaConsumerConfiguration, copied the code from this repo, and added two new methods overrides:
@Override
public void setKeyDeserializer(Deserializer<K> keyDeserializer) {
if (keyDeserializer != null) {
keyDeserializer.configure(null, true);
}
super.setKeyDeserializer(keyDeserializer);
}
@Override
public void setValueDeserializer(Deserializer<V> valueDeserializer) {
if (valueDeserializer != null) {
valueDeserializer.configure(null, false);
}
super.setValueDeserializer(valueDeserializer);
}
With all this together, I was able to get a KafkaListenerException with a SerializationException and the RawDataDeserializationException in the KafkaListenerExceptionHandler.
Then I chose to override the default one and handle the exception by seeking if necessary.
I agree with the comments here that it would be good to add more detail to the KafkaListenerException class, however, I think a small fix that would go a long way would be to update https://github.com/micronaut-projects/micronaut-kafka/blob/master/kafka/src/main/java/io/micronaut/configuration/kafka/config/AbstractKafkaConsumerConfiguration.java to call configure in setKeyDeserializer and setValueDeserializer
Added the configure call to AbstractKafkaConsumerConfiguration and AbstractKafkaProducerConfiguration
Hi, is this thread is fixed, i'm facing the same error but in my own side the rawData is important because i want to push it into my DLQ can anyone dealt with situation ? The only workaround i found is to deserialize twice twice : the first one to JsonNode and from JsonNode to my POJO this way when jsonNode to POJO fails i can still get the data and push it to DLQ