kafka-connect-jdbc
kafka-connect-jdbc copied to clipboard
JDBC sink fails to Postgres database
I encounter the following error when I sink a topic to my postgres database:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
used by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:116)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:69)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:69)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
... 10 more
My sink connector config:
{
"name": "pg_sink_S_COUNTS_VA_INTERMEDIATE",
"config": {
"topics": "S_COUNTS_VA_INTERMEDIATE",
"tasks.max": 1,
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.ignore": "true",
"schema.ignore": "true",
"schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"type.name": "kafka-connect",
"connection.url": "jdbc:postgresql://*****.com:5433/kafka-sink",
"connection.user": "****",
"connection.password": "*****",
"insert.mode": "upsert",
"auto.create": true,
"auto.evolve": true,
"pk.mode": "kafka",
"pk.fields": "__connect_topic,__connect_partition,__connect_offset",
"batch.size": 30
}
}
ENVIRONMENT variables of my kafka-connect container:
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_REST_PORT: 8083
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_PLUGIN_PATH: /usr/share/java
I have a separate producer which can produce JSON data to Kafka without schema defined.
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
The JDBC Sink requires a schema to your data. I'm not sure why this is triggering the error you're seeing, but you definitely to include a schema, either as part of your JSON or using Avro. See https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained
We'll keep this open so that we catch this earlier and give a much better exception message.
Facing the same issue with mongo as source and mysql as sink.Request your help.. "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false"
And if I have a schemaless origin topic, is it possible to create a separated Avro file for the topic data and use it to be able to have a well-defined schema?
@krishkir As explained, JDBC sink connector requires schemas to be enabled
@fabiotc You'd have to write a custom Connect SMT or other processor that could parse your topic and return a new record with a schema applied
Thanks @cricket007
Some notes on this: https://rmoff.net/2020/01/22/kafka-connect-classcastexception/
@cricket007 Thanks for the suggestion on writing a custom SMT. @fabiotc here is a SMT I wrote for appending schema to a record https://github.com/yousufdev/kafka-connect-append-schema, hope it helps.
thanks @rmoff @yousufdev thanks for this! And using your example, Is it possible to append a schema into a complex and nested JSON structure?
@fabiotc yes you can append schema into a complex structure, just pass it in the 'schema' property of the SMT.
"value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false",
The JDBC Sink requires a schema to your data. I'm not sure why this is triggering the error you're seeing, but you definitely to include a schema, either as part of your JSON or using Avro. See https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explaine
I have this issue neeed some help here please
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:61) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:97) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) [2021-10-08 04:43:53,220] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456) [2021-10-08 04:43:53,220] ERROR Task sink-postgres-file-distributed-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)
This is my connector source (im pulling the string from txt file) connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector key.converter.schemas.enable=false file=demo-file.txt tasks.max=1 value.converter.schemas.enable=false topic=demo-2-distributed value.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter
This is my jdbc connector configuration (im trying to sink to postgress) connector.class=io.confluent.connect.jdbc.JdbcSinkConnector connection.password=postgres topics=demo-2-distributed tasks.max=1 key.ignore=true schema.ignore=true key.converter.schemas.enable=false auto.evolve=true connection.user=postgres value.converter.schemas.enable=false auto.create=true connection.url=jdbc:postgresql://postgres:5432/postgres value.converter=org.apache.kafka.connect.json.JsonConverter insert.mode=upsert key.converter=org.apache.kafka.connect.storage.StringConverter
java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct
JDBC sink requires Structured data, not strings.
FileStream source only writes strings unless you use a HoistField transform, for example
java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct
JDBC sink requires Structured data, not strings.
FileStream source only writes strings unless you use a HoistField transform, for example
Hi I haver added the HoistField in the connect Source like this :
transforms=HoistField transforms.HoistField.type=org.apache.kafka.connect.transforms.HoistField$Value transforms.HoistField.field=line
Now im able to see the string as json format in the topic but now im getting in the connector jdbc sink this issue
java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:61) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:97) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) [2021-10-08 05:52:49,638] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456) [2021-10-08 05:52:49,639] ERROR Task sink-postgres-file-distributed-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTas
That error has already been answered in this thread
https://github.com/confluentinc/kafka-connect-jdbc/issues/609#issuecomment-577300246
How about using the JDBC source connector (or Debezium) from one database to another? The problem isn't the connector itself, it's the data you're sending through Connect, and the FileStream source just isn't a good example one to use
That error has already been answered in this thread
How about using the JDBC source connector (or Debezium) from one database to another? The problem isn't the connector itself, it's the data you're sending through Connect, and the FileStream source just isn't a good example one to use
Thank you so much , I changed the way I was doing it... I'm directly sending the message to the topic with the kafka-console-producer including the schema and payload and the sink is able to write those fields in the DB, thank you for your help I'm building a prove of concept to present it at work, thanks.
kafka-console-producer.bat --bootstrap-server 127.0.0.1:9092 --topic demo-2-distributed
{
"schema": {
"type": "struct", "optional": false, "version": 1, "fields": [
{ "field": "id", "type": "string", "optional": true },
{ "field": "artist", "type": "string", "optional": true },
{ "field": "song", "type": "string", "optional": true }
] },
"payload": {
"id": 1,
"artist": "Rick Astley",
"song": "Never Gonna Give You Up"
}
}
Kafka JDBC Sink Connector
{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.password": "postgres", "topics": "demo-2-distributed", "tasks.max": "1", "key.converter.schemas.enable": "false", "fields.whitelist": "Id,artist,song", "auto.evolve": "true", "connection.user": "postgres", "value.converter.schemas.enable": "true", "auto.create": "true", "connection.url": "jdbc:postgresql://postgres:5432/postgres", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "insert.mode": "upsert", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "pk.mode": "kafka", "pk.fields": "__connect_topic,__connect_partition,__connect_offset" }
Keeping the schema part of each message isn't recommended. Rather, you can use Jsonschema, Avro, or Protobuf console producers + their corresponding converters
https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained
@cricket007 Thanks for the suggestion on writing a custom SMT. @fabiotc here is a SMT I wrote for appending schema to a record https://github.com/yousufdev/kafka-connect-append-schema, hope it helps.
@OneCricketeer, @fabiotc can you please star my SMT repo again ? My account got compromised and deleted.Thanks