kafka-connect-fs
kafka-connect-fs copied to clipboard
Not able to ingest multilevel nested json files
I've json file with following content:
{ "integerField": "0", "structField": { "structField1": { "integerField": "0", "longField": "922", "stringField": "0_56c08500-02ff-4434-8fa6-8affccb6d60b", "booleanField": true, "decimalField": "0.0" } } }
Deployed connector without providing Avro schema in the connector definition.
Getting following error:
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault at org.apache.avro.Schema$Names.put(Schema.java:1511) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schemas.toString(Schemas.java:46) at org.apache.avro.Schemas.toString(Schemas.java:30) at io.confluent.kafka.schemaregistry.avro.AvroSchema.canonicalString(AvroSchema.java:155) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:214) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:276) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:252) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:81) at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:155) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:314) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:314) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
Scenario 2 Even providing avro schema to connector definition does not work as the extractedSchema from data in JacksonFileReader does not match with provided avro schema due to name issue.
Provided avro schema fields looks like [Field{name=integerField, index=0, schema=Schema{STRING}}, Field{name=structField, index=1, schema=Schema{structField:STRUCT}}]
Connector generated schema fields looks like [Field{name=integerField, index=0, schema=Schema{STRING}}, Field{name=structField, index=1, schema=Schema{STRUCT}}]
The name in the struct field differs. Error: org.apache.kafka.connect.errors.DataException: Struct schemas do not match. at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:252) at org.apache.kafka.connect.data.Struct.put(Struct.java:216) at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
Please let me know is there any workaround for the issue.
how did you configure the Avro schema?
By using "file_reader.avro.schema.value"