kafka-connect-fs icon indicating copy to clipboard operation
kafka-connect-fs copied to clipboard

Not able to ingest multilevel nested json files

Open mahendra971 opened this issue 4 years ago • 2 comments

I've json file with following content:

{ "integerField": "0", "structField": { "structField1": { "integerField": "0", "longField": "922", "stringField": "0_56c08500-02ff-4434-8fa6-8affccb6d60b", "booleanField": true, "decimalField": "0.0" } } }

Deployed connector without providing Avro schema in the connector definition.

Getting following error:

Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault at org.apache.avro.Schema$Names.put(Schema.java:1511) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schemas.toString(Schemas.java:46) at org.apache.avro.Schemas.toString(Schemas.java:30) at io.confluent.kafka.schemaregistry.avro.AvroSchema.canonicalString(AvroSchema.java:155) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:214) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:276) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:252) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:81) at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:155) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:314) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:314) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:340) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

Scenario 2 Even providing avro schema to connector definition does not work as the extractedSchema from data in JacksonFileReader does not match with provided avro schema due to name issue.

Provided avro schema fields looks like [Field{name=integerField, index=0, schema=Schema{STRING}}, Field{name=structField, index=1, schema=Schema{structField:STRUCT}}]

Connector generated schema fields looks like [Field{name=integerField, index=0, schema=Schema{STRING}}, Field{name=structField, index=1, schema=Schema{STRUCT}}]

The name in the struct field differs. Error: org.apache.kafka.connect.errors.DataException: Struct schemas do not match. at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:252) at org.apache.kafka.connect.data.Struct.put(Struct.java:216) at org.apache.kafka.connect.data.Struct.put(Struct.java:203)

Please let me know is there any workaround for the issue.

mahendra971 avatar Sep 28 '21 10:09 mahendra971

how did you configure the Avro schema?

mmolimar avatar Nov 02 '21 17:11 mmolimar

By using "file_reader.avro.schema.value"

mahendra971 avatar Dec 03 '21 12:12 mahendra971