kafka-connect-fs
kafka-connect-fs copied to clipboard
Avro Serialisation is not working
Hi,
I am using FsSourceConnector Kafka connector to ingest CSV files into a Kafka topic. I am using confluentinc/cp-helm-charts, with custom build docker image for Kafka connect (Added FsSourceConnector connector jar). I have mentioned the prerequisites, Kafka Connect and Kafka Connector details below.
Problem Statement: The below Kafka connector is working and I am able to ingest CSV in to Kafka Topic as a string. My goal is to Avro serialise the CSV data and store it in topics. I am not sure which serialisation configuration is missing in my connect/connector properties.
Prerequisites: I have placed the CSV file in the kafka connect pod directory. Created a schema in confluent schema registry for the csv.
Below is the Kafka connect details, cp-control-center: enabled: false
cp-kafka: enabled: true
cp-kafka-rest: enabled: false
cp-ksql-server: enabled: false
cp-schema-registry: enabled: true
cp-zookeeper: enabled: true
cp-kafka-connect: replicaCount: 1
image: localhost:5000/kc imageTag: v1 imagePullPolicy: Always
servicePort: 8083
configurationOverrides: “key.converter”: “io.confluent.connect.avro.AvroConverter” “key.converter.schema.registry.url”: “test-cp-schema-registry:8081” “value.converter”: “io.confluent.connect.avro.AvroConverter” “value.converter.schema.registry.url”: “test-cp-schema-registry:8081” “key.converter.schemas.enable”: “false” “value.converter.schemas.enable”: “false” “internal.key.converter”: “org.apache.kafka.connect.json.JsonConverter” “internal.value.converter”: “org.apache.kafka.connect.json.JsonConverter” “use.latest.version”: “true” “auto.register.schemas”: “false” “auto.create.topics”: “false” “config.storage.replication.factor”: “1” “offset.storage.replication.factor”: “1” “status.storage.replication.factor”: “1” “plugin.path”: “/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/jars”
heapOptions: “-Xms5g -Xmx10g”
customEnv: KAFKA_JMX_HOSTNAME: “127.0.0.1”
kafka: bootstrapServers: “test-cp-kafka-headless:9092”
cp-schema-registry: url: “test-cp-schema-registry:8081”
fullnameOverride: test
Below is the Kafka connector details: curl -X POST \ http://localhost:8083/connectors \ -H 'Content-Type:application/json' \ -d ' { "name": "sample", "config": { "connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector", "tasks.max": "1", "fs.uris": "/home/appuser/csv", "topic": "sampledata", "use.latest.version": "true", "auto.register.schemas": "false", "poll.interval.ms": "10000", "auto.create.topics": "false", "policy.class": "com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy", "policy.batch_size": "0", "policy.recursive": "true", "policy.regexp": "^*.csv$", "policy.resume.on.error": "false", "key.converter.schema.registry.url": "http://test-cp-schema-registry:8081", "key.enhanced.avro.schema.support": "true", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://test-cp-schema-registry:8081", "value.enhanced.avro.schema.support": "true", "value.converter": "io.confluent.connect.avro.AvroConverter", "file_reader.delimited.settings.format.quote": """, "file_reader.delimited.settings.escape_unquoted": "false", "file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.CsvFileReader", "file_reader.delimited.compression.type": "none", "file_reader.delimited.settings.schema.avro": "{"type":"record","name":"sampledata","namespace":"default","fields":[{"name":"c1","type":"string"},{"name":"c2","type":"string"},{"name":"c3","type":"string"}]}", "file_reader.delimited.settings.delimiter_detection": "false", "file_reader.delimited.compression.concatenated": "true", "file_reader.delimited.settings.format.comment": "#", "file_reader.delimited.settings.format.quote_escape": """, "file_reader.delimited.settings.format.delimiter": ",", "file_reader.encryption.passphrase": "", "file_reader.delimited.settings.max_chars_per_column": "4096", "file_reader.delimited.settings.line_separator_detection": "false", "file_reader.delimited.settings.format.line_separator": "\n", "file_reader.delimited.settings.max_columns": "512", "file_reader.encryption.type": "NONE", "file_reader.delimited.settings.header": "true", "file_reader.delimited.settings.ignore_leading_whitespaces": "true", "file_reader.delimited.settings.rows_to_skip": "0", "file_reader.batch_size": "0", "file_reader.encryption.secret": "" } }'
CSV file: c1,c2,c3 abc,def,ghi jkl,mno,pqr stu,wvy,xyz x1,x2,x3
Schema in Schema Registry: {"subject":"sampledata-value","version":1,"id":1,"schema":"{"type":"record","name":"sampledata","namespace":"default","fields":[{"name":"c1","type":"string"},{"name":"c2","type":"string"},{"name":"c3","type":"string"}]}"}
Data in Topic: /bin/kafka-console-consumer --topic sampledata --from-beginning --bootstrap-server cef-cp-kafka-headless:9092 abcdefghi jklmnopqr stuwvyxyz x1x2x3
Any update on this .. We have the similar issue facing with the parquet as source