stream-reactor
stream-reactor copied to clipboard
KafkaConnect-InfluDB sink - WITHTIMESTAMP is not accepting a json path
###Connector configuration
{
"connector.class": "com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector",
"tasks.max": "1",
"topics": "topic1",
"connect.influx.kcql": "INSERT INTO registrations SELECT after.CREATION_DATE FROM ebsmysql.AR.HZ_PARTIES WITHTIMESTAMP after.CREATION_DATE WITHTAG (op,after.CREATION_DATE)",
"connect.influx.url": "http://influxdb:8086",
"connect.influx.db":"mydb",
"connect.influx.username":"xxxx",
"connect.influx.password":"xxxxx"
}
once the connector is deployed , it's throwing error message like
[2018-01-10 21:35:32,737] ERROR Task dashboard-registrations-influx-sink-connector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
java.lang.IllegalArgumentException: Invalid field selection for 'after'. It doesn't resolve to a primitive field. It resolves to:Schema{ebsmysql.AR.HZ_PARTIES.Value:STRUCT}
at com.datamountaineer.streamreactor.connect.influx.writers.ValuesExtractor$$anonfun$com$datamountaineer$streamreactor$connect$influx$writers$ValuesExtractor$$innerExtract$2$2.apply(ValuesExtractor.scala:393)
at scala.Option.getOrElse(Option.scala:121)
at com.datamountaineer.streamreactor.connect.influx.writers.ValuesExtractor$.com$datamountaineer$streamreactor$connect$influx$writers$ValuesExtractor$$innerExtract$2(ValuesExtractor.scala:352)
at com.datamountaineer.streamreactor.connect.influx.writers.ValuesExtractor$.extract(ValuesExtractor.scala:413)
at com.datamountaineer.streamreactor.connect.influx.writers.InfluxBatchPointsBuilder$$anonfun$buildPointFromStruct$2$$anonfun$15.apply(InfluxBatchPointsBuilder.scala:200)
at com.datamountaineer.streamreactor.connect.influx.writers.InfluxBatchPointsBuilder$$anonfun$buildPointFromStruct$2$$anonfun$15.apply(InfluxBatchPointsBuilder.scala:199)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at com.datamountaineer.streamreactor.connect.influx.writers.InfluxBatchPointsBuilder.build(InfluxBatchPointsBuilder.scala:96)
at com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter.write(InfluxDbWriter.scala:44)
at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask$$anonfun$put$2.apply(InfluxSinkTask.scala:73)
at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask$$anonfun$put$2.apply(InfluxSinkTask.scala:73)
at scala.Option.foreach(Option.scala:257)
at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask.put(InfluxSinkTask.scala:73)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-01-10 21:35:32,738] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
[2018-01-10 21:35:32,738] ERROR Task dashboard-registrations-influx-sink-connector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-01-10 21:35:32,739] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)
[2018-01-10 21:35:32,739] INFO Stopping InfluxDb sink. (com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask:85)
after.CREATION_DATE works in selection fields and WITHTAG fields but not working when I use it in WITHTIMESTAMP section.
Any help? or workarounds?
Can you please share the entire error stack? You cut off the start
updated the error log
Your timestamp field points to a structure: invalidfieldselection -
..to:Schema{ebsmysql.AR.HZ_PARTIES.Value:STRUCT}
. You can't select a struct, you need to select a field which is LONG or it represents a Date.
Which connector version are you using?
Using the below link for connector jar https://github.com/Landoop/stream-reactor/releases/download/0.3.0/kafka-connect-influxdb-0.3.0-3.3.0-all.tar.gz
For timestamp field I used "AFTER.CREATION_DATE"
Where "AFTER" refers to the STRUCT type and inside AFTER -> CREATION_DATE filed is of type long
Basically KCQL query is ignoring CREATION_DATE field when its used in WITHTIMESTAMP section like "after.CREATION_DATE"