parallel-consumer icon indicating copy to clipboard operation
parallel-consumer copied to clipboard

NoSuchFieldException when using consumer inherited from KafkaConsumer

Open micr0farad opened this issue 3 years ago • 7 comments
trafficstars

Issue description

Hey! I'm trying to skip malformed messages by inheriting from KafkaConsumer and altering poll() method behavior.

class ExceptionHandlingKafkaConsumer<K, V> extends KafkaConsumer<K, V> { 
    public HandlingKafkaConsumer(final Map<String, Object> configs) {
        super(configs);
    }

    @Override
    public ConsumerRecords<K, V> poll(Duration timeout) {
        try {
            return super.poll(timeout);
        }
        catch (Exception e) {
            if(!deserializationError(e)) {
                throw e;
            }
            this.seek(topicPartition, offset + 1); // partition and offset retrieved from exception instance
            return ConsumerRecords.empty();
        }
}

But then I get this error on startup:

java.lang.NoSuchFieldException: coordinator
	at java.base/java.lang.Class.getDeclaredField(Class.java:2411)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.checkAutoCommitIsDisabled(AbstractParallelEoSStreamProcessor.java:343)
	at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.<init>(AbstractParallelEoSStreamProcessor.java:199)
	at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.<init>(ParallelEoSStreamProcessor.java:34)
	at io.confluent.parallelconsumer.ParallelStreamProcessor.createEosStreamProcessor(ParallelStreamProcessor.java:28)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig.mediaParallelConsumer(MediaEventsReceiverConfig.java:69)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec.CGLIB$mediaParallelConsumer$1(<generated>)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec$$FastClassBySpringCGLIB$$ff4101fd.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
	at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331)
	at com.avaya.ixo.configuration.kafka.consumer.MediaEventsReceiverConfig$$EnhancerBySpringCGLIB$$97d964ec.mediaParallelConsumer(<generated>)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
	... 49 common frames omitted

Possible solutions

Workaround

Use composition over inheritance. So AbstractParallelEoSStreamProcessor will skip this check.

Proper fix

Probably implement auto.commit.enabled check without using reflection as there's no guarantee that consumer will be directly inherited from KafkaConsumer so we can't reliably use #getSuperclass() on it.

micr0farad avatar Feb 15 '22 10:02 micr0farad

Probably implement auto.commit.enabled check without using reflection

I would looooove to do that, but when I implemented it, I couldn't think of any other way to make sure it's not enabled...

astubbs avatar May 06 '22 09:05 astubbs

The root issue of this however - is what should PC do with deserialization errors? Why don't we just add some handling logic to PC itself?

Also - in your code, aren't you actually skipping over ALL records if /any/ have a deserialization issue?

@JorgenRingen how are you handling this issue?

astubbs avatar May 06 '22 09:05 astubbs

Also - in your code, aren't you actually skipping over ALL records if /any/ have a deserialization issue?

Yes, just skip and log the error

micr0farad avatar May 06 '22 11:05 micr0farad

We use kafka-streams in front with avro, so it's not a practical issue for us as we control input topic. Our de-serde would also just log error and return null.

However, it would be really nice if this was handled by framework with sensible defaults. I kind of like the kafka-streams approach with default and customizable exception-handlers through config: https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html#streamsconfigs_default.deserialization.exception.handler

It uses org.apache.kafka.streams.errors.LogAndFailExceptionHandler by default. There's also a org.apache.kafka.streams.errors.LogAndContinueExceptionHandler which I think makes more sense in most scenarios. And you can implement your own with all sorts of different logic (write to DLQ for example).

There's also "production-exception-handler" for serialization errors.

JorgenRingen avatar May 06 '22 13:05 JorgenRingen

Btw, why isn't this a core feature in kafka-consumer? It seems like the only option is to create a "non-throwing" serde or create some sort of seek-logic. Or to use kafka-spring, which seems like the most popular solution on SO 😆

JorgenRingen avatar May 06 '22 13:05 JorgenRingen

See https://github.com/confluentinc/parallel-consumer/pull/291 for implementation of SKIP and SHUTDOWN for user function failure - will be released in next feature release. Keen to get feedback!

  • #291

We should open a separate issue for deserialization issues:

  • #304

astubbs avatar May 13 '22 14:05 astubbs

I run into the same start-up error when using PC and quarkus-native build.

bartman64 avatar May 30 '22 09:05 bartman64

this.seek(topicPartition, offset + 1); // partition and offset retrieved from exception instance BTW - this is incorrect - it assumes only a single record was polled from the broker

astubbs avatar Nov 03 '22 11:11 astubbs