sdk-java icon indicating copy to clipboard operation
sdk-java copied to clipboard

Cannot close a Kafka Producer / Consumer which uses CloudEvent Serialization

Open tehtea opened this issue 2 years ago • 1 comments

Caused by https://github.com/cloudevents/sdk-java/blob/master/kafka/src/main/java/io/cloudevents/kafka/CloudEventSerializer.java and https://github.com/cloudevents/sdk-java/blob/master/kafka/src/main/java/io/cloudevents/kafka/CloudEventDeserializer.java not implementing the close method

Sample Stack Trace for closing a Producer:

[main] ERROR org.apache.kafka.common.utils.Utils - Failed to close producer valueSerializer with type io.cloudevents.kafka.CloudEventSerializer
java.lang.AbstractMethodError: Method io/cloudevents/kafka/CloudEventSerializer.close()V is abstract
	at io.cloudevents.kafka.CloudEventSerializer.close(CloudEventSerializer.java)
	at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:855)
	at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1187)
	at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1138)
	at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1117)

Sample Stack Trace for closing a Consumer:

org.apache.kafka.common.KafkaException: Failed to close kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2195)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2145)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2096)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.wrapUp(KafkaMessageListenerContainer.java:1153)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:967)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AbstractMethodError: Method io/cloudevents/kafka/CloudEventDeserializer.close()V is abstract
	at io.cloudevents.kafka.CloudEventDeserializer.close(CloudEventDeserializer.java)
	at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2.close(ErrorHandlingDeserializer2.java:222)
	at org.apache.kafka.common.utils.Utils.closeQuietly(Utils.java:855)
	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2187)
	... 7 common frames omitted

tehtea avatar Jun 14 '22 05:06 tehtea

Hi @tehtea, please specify how to reproduce because just by importing this integration it's definitely working

pierDipi avatar Jun 14 '22 06:06 pierDipi

Closing due to missing details

pierDipi avatar Jan 10 '23 16:01 pierDipi