kafka-connect-scylladb icon indicating copy to clipboard operation
kafka-connect-scylladb copied to clipboard

No support for nested json

Open slivne opened this issue 4 years ago • 1 comments

Extracted from users slack channel

{
  "group_id": 1010,
   "competition_count": {
      "val": 20,
      "type": "ts"
   },
   "session_count": {
      "val": 20,
      "type": "ts"
    }
}
ScyllaDb Create Statement :
CREATE TYPE test.custom_type (val int, type text);
CREATE TABLE test.sample (
group_id bigint,
competition_count  FROZEN<custom_type>,
session_count  FROZEN<custom_type>,
PRIMARY KEY (group_id))
WITH default_time_to_live = 172800;

Connector by default converts the nested JSON as map and fails by throwing similar to below exception

[2020-07-08 12:51:41,111] WARN [kafka-connect-scylladb-prod-json|task-0] Exception occurred while extracting records from Kafka Sink Records, ignoring and processing next set of records. (io.connect.scylladb.ScyllaDbSinkTask:252)
org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'competition_count'
	at io.connect.scylladb.RecordConverter.convertMap(RecordConverter.java:167)
	at io.connect.scylladb.RecordConverter.findRecordTypeAndConvert(RecordConverter.java:95)
	at io.connect.scylladb.RecordConverter.convert(RecordConverter.java:82)
	at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:87)
	at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:113)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:545)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Is there a workaround to save nested JSON into scylladb without loosing datatypes information?

slivne avatar Jul 14 '20 08:07 slivne

Thanks for reporting @slivne - Kafka connector should support Avro, simple JSON, or string format - not sure about nested JSON. don't think we included that in the initial requirement - I will add this as a requirement now.

mailmahee avatar Jul 16 '20 01:07 mailmahee