kafka-connect-fs icon indicating copy to clipboard operation
kafka-connect-fs copied to clipboard

Avro Serialisation is not working

Open ibalachandar86 opened this issue 2 years ago • 1 comments

Hi,

I am using FsSourceConnector Kafka connector to ingest CSV files into a Kafka topic. I am using confluentinc/cp-helm-charts, with custom build docker image for Kafka connect (Added FsSourceConnector connector jar). I have mentioned the prerequisites, Kafka Connect and Kafka Connector details below.

Problem Statement: The below Kafka connector is working and I am able to ingest CSV in to Kafka Topic as a string. My goal is to Avro serialise the CSV data and store it in topics. I am not sure which serialisation configuration is missing in my connect/connector properties.

Prerequisites: I have placed the CSV file in the kafka connect pod directory. Created a schema in confluent schema registry for the csv.

Below is the Kafka connect details, cp-control-center: enabled: false

cp-kafka: enabled: true

cp-kafka-rest: enabled: false

cp-ksql-server: enabled: false

cp-schema-registry: enabled: true

cp-zookeeper: enabled: true

cp-kafka-connect: replicaCount: 1

image: localhost:5000/kc imageTag: v1 imagePullPolicy: Always

servicePort: 8083

configurationOverrides: “key.converter”: “io.confluent.connect.avro.AvroConverter” “key.converter.schema.registry.url”: “test-cp-schema-registry:8081” “value.converter”: “io.confluent.connect.avro.AvroConverter” “value.converter.schema.registry.url”: “test-cp-schema-registry:8081” “key.converter.schemas.enable”: “false” “value.converter.schemas.enable”: “false” “internal.key.converter”: “org.apache.kafka.connect.json.JsonConverter” “internal.value.converter”: “org.apache.kafka.connect.json.JsonConverter” “use.latest.version”: “true” “auto.register.schemas”: “false” “auto.create.topics”: “false” “config.storage.replication.factor”: “1” “offset.storage.replication.factor”: “1” “status.storage.replication.factor”: “1” “plugin.path”: “/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/jars”

heapOptions: “-Xms5g -Xmx10g”

customEnv: KAFKA_JMX_HOSTNAME: “127.0.0.1”

kafka: bootstrapServers: “test-cp-kafka-headless:9092”

cp-schema-registry: url: “test-cp-schema-registry:8081”

fullnameOverride: test

Below is the Kafka connector details: curl -X POST \ http://localhost:8083/connectors \ -H 'Content-Type:application/json' \ -d ' { "name": "sample", "config": { "connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector", "tasks.max": "1", "fs.uris": "/home/appuser/csv", "topic": "sampledata", "use.latest.version": "true", "auto.register.schemas": "false", "poll.interval.ms": "10000", "auto.create.topics": "false", "policy.class": "com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy", "policy.batch_size": "0", "policy.recursive": "true", "policy.regexp": "^*.csv$", "policy.resume.on.error": "false", "key.converter.schema.registry.url": "http://test-cp-schema-registry:8081", "key.enhanced.avro.schema.support": "true", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://test-cp-schema-registry:8081", "value.enhanced.avro.schema.support": "true", "value.converter": "io.confluent.connect.avro.AvroConverter", "file_reader.delimited.settings.format.quote": """, "file_reader.delimited.settings.escape_unquoted": "false", "file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.CsvFileReader", "file_reader.delimited.compression.type": "none", "file_reader.delimited.settings.schema.avro": "{"type":"record","name":"sampledata","namespace":"default","fields":[{"name":"c1","type":"string"},{"name":"c2","type":"string"},{"name":"c3","type":"string"}]}", "file_reader.delimited.settings.delimiter_detection": "false", "file_reader.delimited.compression.concatenated": "true", "file_reader.delimited.settings.format.comment": "#", "file_reader.delimited.settings.format.quote_escape": """, "file_reader.delimited.settings.format.delimiter": ",", "file_reader.encryption.passphrase": "", "file_reader.delimited.settings.max_chars_per_column": "4096", "file_reader.delimited.settings.line_separator_detection": "false", "file_reader.delimited.settings.format.line_separator": "\n", "file_reader.delimited.settings.max_columns": "512", "file_reader.encryption.type": "NONE", "file_reader.delimited.settings.header": "true", "file_reader.delimited.settings.ignore_leading_whitespaces": "true", "file_reader.delimited.settings.rows_to_skip": "0", "file_reader.batch_size": "0", "file_reader.encryption.secret": "" } }'

CSV file: c1,c2,c3 abc,def,ghi jkl,mno,pqr stu,wvy,xyz x1,x2,x3

Schema in Schema Registry: {"subject":"sampledata-value","version":1,"id":1,"schema":"{"type":"record","name":"sampledata","namespace":"default","fields":[{"name":"c1","type":"string"},{"name":"c2","type":"string"},{"name":"c3","type":"string"}]}"}

Data in Topic: /bin/kafka-console-consumer --topic sampledata --from-beginning --bootstrap-server cef-cp-kafka-headless:9092 abcdefghi jklmnopqr stuwvyxyz x1x2x3

ibalachandar86 avatar Jul 16 '22 12:07 ibalachandar86

Any update on this .. We have the similar issue facing with the parquet as source

baratamavinash225 avatar Feb 03 '23 11:02 baratamavinash225