opensearch-connector-for-apache-kafka
opensearch-connector-for-apache-kafka copied to clipboard
Failed to deserialize data for topic [topic] to Avro
Hi, The plugin, from GitHub's main branch, throws an error when consuming a topic with simple json as well as a topic with avro messages. I'm using the latest versions of confluentinc Broker, Zookeeper, Connect, and opensearchproject OpenSearch.
I wonder if this is related to the following variable when starting Connect:
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
My config:
{
"name": "opensearch-sink",
"config": {
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"tasks.max": "1",
"topics": "some-existing-topic",
"key.ignore": "true",
"connection.url": "http://opensearch-node1:9200",
"type.name": "kafka-connect"
}
}
Connect stack trace
[2022-12-09 21:02:36,297] ERROR WorkerSinkTask{id=opensearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
2022-12-09T21:02:36.298045183Z org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
2022-12-09T21:02:36.298063297Z at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)
2022-12-09T21:02:36.298067853Z at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
2022-12-09T21:02:36.298071164Z at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
2022-12-09T21:02:36.298074122Z at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
2022-12-09T21:02:36.298077269Z at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
2022-12-09T21:02:36.298080134Z at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
2022-12-09T21:02:36.298082970Z at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
2022-12-09T21:02:36.298086088Z at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
2022-12-09T21:02:36.298089054Z at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
2022-12-09T21:02:36.298092333Z at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2022-12-09T21:02:36.298095317Z at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2022-12-09T21:02:36.298098179Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2022-12-09T21:02:36.298101007Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2022-12-09T21:02:36.298103968Z at java.base/java.lang.Thread.run(Thread.java:829)
2022-12-09T21:02:36.298107141Z Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic role-received to Avro:
2022-12-09T21:02:36.298110427Z at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
2022-12-09T21:02:36.298113334Z at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
2022-12-09T21:02:36.298116211Z at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)
2022-12-09T21:02:36.298122519Z at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
2022-12-09T21:02:36.298125917Z at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
2022-12-09T21:02:36.298129072Z ... 13 more
2022-12-09T21:02:36.298131874Z Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
2022-12-09T21:02:36.298134802Z at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
2022-12-09T21:02:36.298174390Z at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:334)
2022-12-09T21:02:36.298181482Z at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:202)
2022-12-09T21:02:36.298184797Z at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
2022-12-09T21:02:36.298187722Z at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
2022-12-09T21:02:36.298190604Z ... 17 more
I have change the config of Connect key and value converter to : org.apache.kafka.connect.storage.StringConverter. This works but with json payload or avro payloads, it fails. I guess it's not related to the plugin itself but to Kafka Connect.
@eaudet does this mean that this ticket should be closed, or is there a change that you think needs to be made.
If you have data in your input topic and want it deserialized, your converter will need to support every type of data in the input topic. The JsonConverter and AvroConverter each only support one format of input data, so a mixed input topic will always fail to deserialize at least some of the messages. In practice, people do not use mixed topics because of the difficulty associated with deserializing them.
@eaudet You should either pick a standard format (globally or for each topic) and use the standard converters, or implement a converter which meets your needs. Either way, this is not an improvement that can be made in this connector. This connector always serializes using JSON, and needs a deserializing converter which reads structured data(not String or Bytes).