kafka-connect-scylladb
kafka-connect-scylladb copied to clipboard
No support for nested json
Extracted from users slack channel
{
"group_id": 1010,
"competition_count": {
"val": 20,
"type": "ts"
},
"session_count": {
"val": 20,
"type": "ts"
}
}
ScyllaDb Create Statement :
CREATE TYPE test.custom_type (val int, type text);
CREATE TABLE test.sample (
group_id bigint,
competition_count FROZEN<custom_type>,
session_count FROZEN<custom_type>,
PRIMARY KEY (group_id))
WITH default_time_to_live = 172800;
Connector by default converts the nested JSON as map and fails by throwing similar to below exception
[2020-07-08 12:51:41,111] WARN [kafka-connect-scylladb-prod-json|task-0] Exception occurred while extracting records from Kafka Sink Records, ignoring and processing next set of records. (io.connect.scylladb.ScyllaDbSinkTask:252)
org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'competition_count'
at io.connect.scylladb.RecordConverter.convertMap(RecordConverter.java:167)
at io.connect.scylladb.RecordConverter.findRecordTypeAndConvert(RecordConverter.java:95)
at io.connect.scylladb.RecordConverter.convert(RecordConverter.java:82)
at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:87)
at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:113)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:545)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Is there a workaround to save nested JSON into scylladb without loosing datatypes information?
Thanks for reporting @slivne - Kafka connector should support Avro, simple JSON, or string format - not sure about nested JSON. don't think we included that in the initial requirement - I will add this as a requirement now.