kafka-connect-spooldir
kafka-connect-spooldir copied to clipboard
Unable to write JSON schemaless events
Hi @jcustenborder,
I’m having some trouble trying to use SMT functions with SpoolDirSchemaLessJsonSourceConnector
.
I would like to simply ingest some schemaless JSON events from a file into a topic, applying the ReplaceField SMT function. Here is the connector configuration:
name=testReplaceField
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSchemaLessJsonSourceConnector
tasks.max=1
topic=testReplaceField
input.path=/tmp/data
input.file.pattern=test-replaceField-no-schema.json
error.path=/tmp/data/error
finished.path=/tmp/data/finished
halt.on.error=false
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
transforms=RenameField
transforms.RenameField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames=foo:c1,bar:c2
my sample source file is populated as follow:
{"foo": 1, "bar": "test1"}
{"foo": 2, "bar": "test2"}
{"foo": 3, "bar": "test3"}
Despite i disabled the schemas for both the key and the value, it seems the SMT function is still interpreting my JSON events as if they had a schema. I got the following exception from the connector logs:
[2021-06-16 17:23:36,933] ERROR WorkerSourceTask{id=testReplaceField-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:341)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [field replacement], found: java.lang.String
at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
at org.apache.kafka.connect.transforms.ReplaceField.applyWithSchema(ReplaceField.java:167)
at org.apache.kafka.connect.transforms.ReplaceField.apply(ReplaceField.java:146)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 11 more
As you can see into the stacktrace, it keep going into the applyWithSchema
method, which obviously fails!
As suggested here, i already tried to use StringConverter instead of JsonConverter but with no luck. Same error.
Am i doing something wrong??
Thanks in advance!
Regards,
Mauro
Hi @mroiter-larus,
Kafka Connect is a little weird. It has it's own type system that is independent to how the data is serialized. For example a Struct is basically a row like structure. In Json this would be an object like your examples. In Avro it's a Record. So basically what is happening is this connector is using jackson to stream the file breaking by each object boundary. It hands this off as a string. If you were using only the string converter you would be done. In your example you're running a transformation so it looks like this SpoolDirSchemaLessJsonSourceConnector -> RenameField -> Converter
It's breaking at the RenameField
part. That SMT is saying that it doesn't support Strings. You have a couple options. You could use something like FromJson and parse the data by a json schema, then run your rename field. Another option would be convert it post with something like KSQL.
https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-json-schema/transformations/examples/FromJson.inline.html
Hi @jcustenborder ,
Thanks a lot for your answer!
I tried the first approach you suggested (using FromJson). Actually it works, but doing so the JSON events, that were initially schemaless, are treated as they had schema (and i think this exactly what is expected by the FromJson transformation). I mean, the RenameField step is still running the applyWithSchema
method and i would like to prevent this.
Is there a way to do the ReplaceField step so that the executed method should be the applySchemaless
?
Thanks a lot!
Off the top of my head I'm not sure. I don't know how much I would worry about it being schemaless or with schema. The converter could be Apache Kafka Json Converter which would remove the schema or you can use the JsonSchemaConverter I wrote which will attach the schema as a header. It's in the same project. That would give you json in your topic.
https://github.com/jcustenborder/kafka-connect-json-schema/blob/master/src/main/java/com/github/jcustenborder/kafka/connect/json/JsonSchemaConverter.java