sdk-java
sdk-java copied to clipboard
Cannot close a Kafka Producer / Consumer which uses CloudEvent Serialization
Caused by https://github.com/cloudevents/sdk-java/blob/master/kafka/src/main/java/io/cloudevents/kafka/CloudEventSerializer.java and https://github.com/cloudevents/sdk-java/blob/master/kafka/src/main/java/io/cloudevents/kafka/CloudEventDeserializer.java not implementing the close
method
Sample Stack Trace for closing a Producer:
[main] ERROR org.apache.kafka.common.utils.Utils - Failed to close producer valueSerializer with type io.cloudevents.kafka.CloudEventSerializer
java.lang.AbstractMethodError: Method io/cloudevents/kafka/CloudEventSerializer.close()V is abstract
at io.cloudevents.kafka.CloudEventSerializer.close(CloudEventSerializer.java)
at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:855)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1187)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1138)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1117)
Sample Stack Trace for closing a Consumer:
org.apache.kafka.common.KafkaException: Failed to close kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2195)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2145)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2096)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.wrapUp(KafkaMessageListenerContainer.java:1153)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:967)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AbstractMethodError: Method io/cloudevents/kafka/CloudEventDeserializer.close()V is abstract
at io.cloudevents.kafka.CloudEventDeserializer.close(CloudEventDeserializer.java)
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.close(ErrorHandlingDeserializer2.java:222)
at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:855)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2187)
... 7 common frames omitted
Hi @tehtea, please specify how to reproduce because just by importing this integration it's definitely working
Closing due to missing details