parallel-consumer
parallel-consumer copied to clipboard
NoSuchFieldException when using consumer inherited from KafkaConsumer
Issue description
Hey! I'm trying to skip malformed messages by inheriting from KafkaConsumer and altering poll() method behavior.
class ExceptionHandlingKafkaConsumer<K, V> extends KafkaConsumer<K, V> {
public HandlingKafkaConsumer(final Map<String, Object> configs) {
super(configs);
}
@Override
public ConsumerRecords<K, V> poll(Duration timeout) {
try {
return super.poll(timeout);
}
catch (Exception e) {
if(!deserializationError(e)) {
throw e;
}
this.seek(topicPartition, offset + 1); // partition and offset retrieved from exception instance
return ConsumerRecords.empty();
}
}
But then I get this error on startup:
java.lang.NoSuchFieldException: coordinator
at java.base/java.lang.Class.getDeclaredField(Class.java:2411)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.checkAutoCommitIsDisabled(AbstractParallelEoSStreamProcessor.java:343)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:199)
at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.<init>(ParallelEoSStreamProcessor.java:34)
at io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor(ParallelStreamProcessor.java:28)
at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig.mediaParallelConsumer(MediaEventsReceiverConfig.java:69)
at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec.CGLIB$mediaParallelConsumer$1(<generated>)
at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec$$FastClassBySpringCGLIB$$ff4101fd.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331)
at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec.mediaParallelConsumer(<generated>)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
... 49 common frames omitted
Possible solutions
Workaround
Use composition over inheritance. So AbstractParallelEoSStreamProcessor will skip this check.
Proper fix
Probably implement auto.commit.enabled check without using reflection as there's no guarantee that consumer will be directly inherited from KafkaConsumer so we can't reliably use #getSuperclass() on it.
Probably implement
auto.commit.enabledcheck without using reflection
I would looooove to do that, but when I implemented it, I couldn't think of any other way to make sure it's not enabled...
The root issue of this however - is what should PC do with deserialization errors? Why don't we just add some handling logic to PC itself?
Also - in your code, aren't you actually skipping over ALL records if /any/ have a deserialization issue?
@JorgenRingen how are you handling this issue?
Also - in your code, aren't you actually skipping over ALL records if /any/ have a deserialization issue?
Yes, just skip and log the error
We use kafka-streams in front with avro, so it's not a practical issue for us as we control input topic. Our de-serde would also just log error and return null.
However, it would be really nice if this was handled by framework with sensible defaults. I kind of like the kafka-streams approach with default and customizable exception-handlers through config: https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html#streamsconfigs_default.deserialization.exception.handler
It uses org.apache.kafka.streams.errors.LogAndFailExceptionHandler by default. There's also a org.apache.kafka.streams.errors.LogAndContinueExceptionHandler which I think makes more sense in most scenarios. And you can implement your own with all sorts of different logic (write to DLQ for example).
There's also "production-exception-handler" for serialization errors.
Btw, why isn't this a core feature in kafka-consumer? It seems like the only option is to create a "non-throwing" serde or create some sort of seek-logic. Or to use kafka-spring, which seems like the most popular solution on SO 😆
See https://github.com/confluentinc/parallel-consumer/pull/291 for implementation of SKIP and SHUTDOWN for user function failure - will be released in next feature release. Keen to get feedback!
- #291
We should open a separate issue for deserialization issues:
- #304
I run into the same start-up error when using PC and quarkus-native build.
this.seek(topicPartition, offset + 1); // partition and offset retrieved from exception instance
BTW - this is incorrect - it assumes only a single record was polled from the broker