stream-reactor icon indicating copy to clipboard operation
stream-reactor copied to clipboard

KafkaConnect-InfluDB sink - WITHTIMESTAMP is not accepting a json path

Open vijayambati1 opened this issue 7 years ago • 6 comments

###Connector configuration

{
        "connector.class": "com.datamountaineer.streamreactor.connect.influx.InfluxSinkConnector",
        "tasks.max": "1",
        "topics": "topic1",
        "connect.influx.kcql": "INSERT INTO registrations SELECT after.CREATION_DATE FROM ebsmysql.AR.HZ_PARTIES WITHTIMESTAMP after.CREATION_DATE WITHTAG (op,after.CREATION_DATE)",
		"connect.influx.url": "http://influxdb:8086",
		"connect.influx.db":"mydb",
		"connect.influx.username":"xxxx",
		"connect.influx.password":"xxxxx"
}

once the connector is deployed , it's throwing error message like

[2018-01-10 21:35:32,737] ERROR Task dashboard-registrations-influx-sink-connector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
java.lang.IllegalArgumentException: Invalid field selection for 'after'. It doesn't resolve to a primitive field. It resolves to:Schema{ebsmysql.AR.HZ_PARTIES.Value:STRUCT}
	at com.datamountaineer.streamreactor.connect.influx.writers.ValuesExtractor$$anonfun$com$datamountaineer$streamreactor$connect$influx$writers$ValuesExtractor$$innerExtract$2$2.apply(ValuesExtractor.scala:393)
	at scala.Option.getOrElse(Option.scala:121)
	at com.datamountaineer.streamreactor.connect.influx.writers.ValuesExtractor$.com$datamountaineer$streamreactor$connect$influx$writers$ValuesExtractor$$innerExtract$2(ValuesExtractor.scala:352)
	at com.datamountaineer.streamreactor.connect.influx.writers.ValuesExtractor$.extract(ValuesExtractor.scala:413)
	at com.datamountaineer.streamreactor.connect.influx.writers.InfluxBatchPointsBuilder$$anonfun$buildPointFromStruct$2$$anonfun$15.apply(InfluxBatchPointsBuilder.scala:200)
	at com.datamountaineer.streamreactor.connect.influx.writers.InfluxBatchPointsBuilder$$anonfun$buildPointFromStruct$2$$anonfun$15.apply(InfluxBatchPointsBuilder.scala:199)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at com.datamountaineer.streamreactor.connect.influx.writers.InfluxBatchPointsBuilder.build(InfluxBatchPointsBuilder.scala:96)
	at com.datamountaineer.streamreactor.connect.influx.writers.InfluxDbWriter.write(InfluxDbWriter.scala:44)
	at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask$$anonfun$put$2.apply(InfluxSinkTask.scala:73)
	at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask$$anonfun$put$2.apply(InfluxSinkTask.scala:73)
	at scala.Option.foreach(Option.scala:257)
	at com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask.put(InfluxSinkTask.scala:73)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2018-01-10 21:35:32,738] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456)
[2018-01-10 21:35:32,738] ERROR Task dashboard-registrations-influx-sink-connector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
[2018-01-10 21:35:32,739] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)
[2018-01-10 21:35:32,739] INFO Stopping InfluxDb sink. (com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask:85)

after.CREATION_DATE works in selection fields and WITHTAG fields but not working when I use it in WITHTIMESTAMP section.

Any help? or workarounds?

vijayambati1 avatar Jan 10 '18 21:01 vijayambati1

Can you please share the entire error stack? You cut off the start

stheppi avatar Jan 10 '18 21:01 stheppi

updated the error log

vijayambati1 avatar Jan 10 '18 21:01 vijayambati1

Your timestamp field points to a structure: invalidfieldselection -

..to:Schema{ebsmysql.AR.HZ_PARTIES.Value:STRUCT}. You can't select a struct, you need to select a field which is LONG or it represents a Date.

stheppi avatar Jan 10 '18 21:01 stheppi

Which connector version are you using?

stheppi avatar Jan 10 '18 22:01 stheppi

Using the below link for connector jar https://github.com/Landoop/stream-reactor/releases/download/0.3.0/kafka-connect-influxdb-0.3.0-3.3.0-all.tar.gz

For timestamp field I used "AFTER.CREATION_DATE"

Where "AFTER" refers to the STRUCT type and inside AFTER -> CREATION_DATE filed is of type long

vijayambati1 avatar Jan 10 '18 22:01 vijayambati1

Basically KCQL query is ignoring CREATION_DATE field when its used in WITHTIMESTAMP section like "after.CREATION_DATE"

vijayambati1 avatar Jan 10 '18 22:01 vijayambati1