kafka-connect-spooldir
kafka-connect-spooldir copied to clipboard
While fetching JSON data , shows integer value to null
Hi, i tried to fetch data from json file through "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector".
the problem I encountered is that the integer value and float value are displayed null in the topic other than that is fine, I mean string is shown in the topic. For your reference : config used :
{
"name": "source_json_file_source_connector10",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector",
"topic": "new_json_int_data_test3",
"input.path": "/home/dahal/kafka-3.0.0-src/share/data/json",
"finished.path": "/home/dahal/kafka-3.0.0-src/share/data/json/processed",
"error.path": "/home/dahal/kafka-3.0.0-src/share/data/json/error",
"input.file.pattern": ".*\\.json",
"empty.poll.wait.ms": 10000,
"schema.generation.enabled": true
}
}
topic record:
rowtime: 2022/01/12 07:18:14.144 Z, key: {"schema":{"type":"struct","fields":[],"optional":false,"name":"com.github.jcustenborder.kafka.connect.model.Key"},"payload":{}}, value: {"schema":{"type":"struct
","fields":[{"type":"string","optional":true,"field":"id"},{"type":"string","optional":true,"field":"employee_name"},{"type":"double","optional":true,"field":"employee_salary"},{"type":"string","optional
":true,"field":"employee_age"},{"type":"string","optional":true,"field":"profile_image"}],"optional":false,"name":"com.github.jcustenborder.kafka.connect.model.Value"},"payload":{"id":null,"employee_name
":"Tiger Nixon","employee_salary":null,"employee_age":null,"profile_image":""}}, partition: 0
JSON data:
{
"id": 1,
"employee_name": "Tiger Nixon",
"employee_salary": 320800,
"employee_age": 61,
"profile_image": ""
}
If anyone can help me with this, that will be awesome.
don't know why but by default while not passing value.schema from config then all the attributes are set to STRING but the integer value is not converted to STRING so I got a null value in that case. A solution that I found out is passing key and value schema through config and in integer we can pass INT16,INT32,INT64 etc and in FLOAT value we can pass FLOAT16,FLOAT32,FLOAT64 etc.
"key.schema": "{\"name\":\"com.github.jcustenborder.kafka.connect.model.Key\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{}}",
"value.schema": "{\"name\":\"com.github.jcustenborder.kafka.connect.model.Value\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false},\"employee_name\":{\"type\":\"STRING\",\"isOptional\":true},\"employee_salary\" : {\"type\" : \"FLOAT64\",\"isOptional\" : true},\"employee_age\":{\"type\":\"INT64\",\"isOptional\":false},\"profile_image\":{\"type\":\"STRING\",\"isOptional\":true}}}"
Also, it is good to pass schema instead of auto-generated because in auto-generate it got crash while the source file is not present but in other case it wont
Can you run the connector with trace logging and this file? If it's having trouble with the conversion it might output why