opensearch-connector-for-apache-kafka icon indicating copy to clipboard operation
opensearch-connector-for-apache-kafka copied to clipboard

Failed to deserialize data for topic [topic] to Avro

Open eaudet opened this issue 1 year ago • 3 comments

Hi, The plugin, from GitHub's main branch, throws an error when consuming a topic with simple json as well as a topic with avro messages. I'm using the latest versions of confluentinc Broker, Zookeeper, Connect, and opensearchproject OpenSearch.

I wonder if this is related to the following variable when starting Connect:

CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter

My config:

{
    "name": "opensearch-sink",
    "config": {
        "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
        "tasks.max": "1",
        "topics": "some-existing-topic",
        "key.ignore": "true",
        "connection.url": "http://opensearch-node1:9200",
        "type.name": "kafka-connect"
    }
}

Connect stack trace

[2022-12-09 21:02:36,297] ERROR WorkerSinkTask{id=opensearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
2022-12-09T21:02:36.298045183Z org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
2022-12-09T21:02:36.298063297Z 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
2022-12-09T21:02:36.298067853Z 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
2022-12-09T21:02:36.298071164Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
2022-12-09T21:02:36.298074122Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
2022-12-09T21:02:36.298077269Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
2022-12-09T21:02:36.298080134Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
2022-12-09T21:02:36.298082970Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
2022-12-09T21:02:36.298086088Z 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
2022-12-09T21:02:36.298089054Z 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
2022-12-09T21:02:36.298092333Z 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022-12-09T21:02:36.298095317Z 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022-12-09T21:02:36.298098179Z 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022-12-09T21:02:36.298101007Z 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022-12-09T21:02:36.298103968Z 	at java.base/java.lang.Thread.run(Thread.java:829)
2022-12-09T21:02:36.298107141Z Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic role-received to Avro: 
2022-12-09T21:02:36.298110427Z 	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
2022-12-09T21:02:36.298113334Z 	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
2022-12-09T21:02:36.298116211Z 	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)
2022-12-09T21:02:36.298122519Z 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
2022-12-09T21:02:36.298125917Z 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
2022-12-09T21:02:36.298129072Z 	... 13 more
2022-12-09T21:02:36.298131874Z Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
2022-12-09T21:02:36.298134802Z 	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
2022-12-09T21:02:36.298174390Z 	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:334)
2022-12-09T21:02:36.298181482Z 	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:202)
2022-12-09T21:02:36.298184797Z 	at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
2022-12-09T21:02:36.298187722Z 	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
2022-12-09T21:02:36.298190604Z 	... 17 more

eaudet avatar Dec 09 '22 21:12 eaudet

I have change the config of Connect key and value converter to : org.apache.kafka.connect.storage.StringConverter. This works but with json payload or avro payloads, it fails. I guess it's not related to the plugin itself but to Kafka Connect.

eaudet avatar Dec 09 '22 22:12 eaudet

@eaudet does this mean that this ticket should be closed, or is there a change that you think needs to be made.

Claudenw avatar Mar 07 '23 09:03 Claudenw

If you have data in your input topic and want it deserialized, your converter will need to support every type of data in the input topic. The JsonConverter and AvroConverter each only support one format of input data, so a mixed input topic will always fail to deserialize at least some of the messages. In practice, people do not use mixed topics because of the difficulty associated with deserializing them.

@eaudet You should either pick a standard format (globally or for each topic) and use the standard converters, or implement a converter which meets your needs. Either way, this is not an improvement that can be made in this connector. This connector always serializes using JSON, and needs a deserializing converter which reads structured data(not String or Bytes).

gharris1727 avatar Apr 05 '23 20:04 gharris1727