stream-reactor
stream-reactor copied to clipboard
Can't get Redis GeoAdd working
What version of the Stream Reactor are you reporting this issue for?
4.2.0
Are you running the correct version of Kafka/Confluent for the Stream reactor release?
yes
What is the expected behaviour?
I'm trying to insert this kafka record :
{"latitude":60.203144,"longitude":24.966624,"oper":"22","veh":"1190"}
into Redis with the kcql : INSERT INTO buses: SELECT veh from geo_test PK oper STOREAS GEOADD
What was observed?
An exception is throw right after startup of the connector
What is your connector properties configuration ?
{
"connect.redis.kcql": "INSERT INTO buses: SELECT veh from geo_test PK oper STOREAS GeoAdd",
"name": "redis-embeddings-devrel-sebastien.aivencloud.com",
"connect.redis.host": "##",
"connector.class": "com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkConnector",
"connect.redis.port": "22234",
"connect.redis.password": "##",
"errors.log.enable": "true",
"connect.redis.ssl.enabled": "true",
"connect.redis.error.policy": "NOOP",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.log.include.messages": "true",
"connect.redis.retry.interval": "6000",
"topics": "geo_test",
"connect.redis.max.retries": "20",
"value.converter.schemas.enable": "false"
}
Please provide full log files (redact and sensitive information)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalArgumentException: requirement failed: ErrorTracker is not set call. Initialize.
at scala.Predef$.require(Predef.scala:337)
at com.datamountaineer.streamreactor.common.errors.ErrorHandler.handleTry(ErrorHandler.scala:48)
at com.datamountaineer.streamreactor.common.errors.ErrorHandler.handleTry$(ErrorHandler.scala:47)
at com.datamountaineer.streamreactor.connect.redis.sink.writer.RedisWriter.handleTry(RedisWriter.scala:31)
at com.datamountaineer.streamreactor.connect.redis.sink.writer.RedisGeoAdd.$anonfun$insert$1(RedisGeoAdd.scala:117)
at com.datamountaineer.streamreactor.connect.redis.sink.writer.RedisGeoAdd.$anonfun$insert$1$adapted(RedisGeoAdd.scala:54)
at scala.collection.immutable.HashMap.foreach(HashMap.scala:1076)
at com.datamountaineer.streamreactor.connect.redis.sink.writer.RedisGeoAdd.insert(RedisGeoAdd.scala:54)
at com.datamountaineer.streamreactor.connect.redis.sink.writer.RedisGeoAdd.write(RedisGeoAdd.scala:49)
at com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkTask.$anonfun$put$2(RedisSinkTask.scala:239)
at com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkTask.$anonfun$put$2$adapted(RedisSinkTask.scala:239)
at scala.collection.immutable.List.foreach(List.scala:333)
at com.datamountaineer.streamreactor.connect.redis.sink.RedisSinkTask.put(RedisSinkTask.scala:239)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
... 11 more
Also seen the same behavoir
This has been fixed.Please use the latest 6.3.0 release.