kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

JDBC sink fails to Postgres database

Open kccheung opened this issue 5 years ago • 17 comments

I encounter the following error when I sink a topic to my postgres database:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
used by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:116)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:69)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:69)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
... 10 more

My sink connector config:

{
    "name": "pg_sink_S_COUNTS_VA_INTERMEDIATE",
    "config": {
        "topics": "S_COUNTS_VA_INTERMEDIATE",
        "tasks.max": 1,
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.ignore": "true",
        "schema.ignore": "true",
        "schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "type.name": "kafka-connect",
        "connection.url": "jdbc:postgresql://*****.com:5433/kafka-sink",
        "connection.user": "****",
        "connection.password": "*****",
        "insert.mode": "upsert",
        "auto.create": true,
        "auto.evolve": true,
        "pk.mode": "kafka",
        "pk.fields": "__connect_topic,__connect_partition,__connect_offset",
        "batch.size": 30
    }
}

ENVIRONMENT variables of my kafka-connect container:

      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_REST_PORT: 8083
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_PLUGIN_PATH: /usr/share/java

I have a separate producer which can produce JSON data to Kafka without schema defined.

kccheung avatar Mar 05 '19 06:03 kccheung

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

The JDBC Sink requires a schema to your data. I'm not sure why this is triggering the error you're seeing, but you definitely to include a schema, either as part of your JSON or using Avro. See https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

rmoff avatar Mar 28 '19 12:03 rmoff

We'll keep this open so that we catch this earlier and give a much better exception message.

rhauch avatar Aug 23 '19 13:08 rhauch

Facing the same issue with mongo as source and mysql as sink.Request your help.. "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false"

krishkir avatar Aug 30 '19 11:08 krishkir

And if I have a schemaless origin topic, is it possible to create a separated Avro file for the topic data and use it to be able to have a well-defined schema?

fabiotc avatar Sep 11 '19 08:09 fabiotc

@krishkir As explained, JDBC sink connector requires schemas to be enabled

@fabiotc You'd have to write a custom Connect SMT or other processor that could parse your topic and return a new record with a schema applied

OneCricketeer avatar Sep 18 '19 10:09 OneCricketeer

Thanks @cricket007

fabiotc avatar Sep 18 '19 11:09 fabiotc

Some notes on this: https://rmoff.net/2020/01/22/kafka-connect-classcastexception/

rmoff avatar Jan 22 '20 17:01 rmoff

@cricket007 Thanks for the suggestion on writing a custom SMT. @fabiotc here is a SMT I wrote for appending schema to a record https://github.com/yousufdev/kafka-connect-append-schema, hope it helps.

ghost avatar Jan 23 '20 10:01 ghost

thanks @rmoff @yousufdev thanks for this! And using your example, Is it possible to append a schema into a complex and nested JSON structure?

fabiotc avatar Jan 23 '20 12:01 fabiotc

@fabiotc yes you can append schema into a complex structure, just pass it in the 'schema' property of the SMT.

ghost avatar Jan 24 '20 06:01 ghost

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

The JDBC Sink requires a schema to your data. I'm not sure why this is triggering the error you're seeing, but you definitely to include a schema, either as part of your JSON or using Avro. See https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explaine

I have this issue neeed some help here please

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:61) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:97) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) [2021-10-08 04:43:53,220] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456) [2021-10-08 04:43:53,220] ERROR Task sink-postgres-file-distributed-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

This is my connector source (im pulling the string from txt file) connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector key.converter.schemas.enable=false file=demo-file.txt tasks.max=1 value.converter.schemas.enable=false topic=demo-2-distributed value.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter

This is my jdbc connector configuration (im trying to sink to postgress) connector.class=io.confluent.connect.jdbc.JdbcSinkConnector connection.password=postgres topics=demo-2-distributed tasks.max=1 key.ignore=true schema.ignore=true key.converter.schemas.enable=false auto.evolve=true connection.user=postgres value.converter.schemas.enable=false auto.create=true connection.url=jdbc:postgresql://postgres:5432/postgres value.converter=org.apache.kafka.connect.json.JsonConverter insert.mode=upsert key.converter=org.apache.kafka.connect.storage.StringConverter

ggabmc avatar Oct 08 '21 04:10 ggabmc

java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct

JDBC sink requires Structured data, not strings.

FileStream source only writes strings unless you use a HoistField transform, for example

OneCricketeer avatar Oct 08 '21 04:10 OneCricketeer

java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct

JDBC sink requires Structured data, not strings.

FileStream source only writes strings unless you use a HoistField transform, for example

Hi I haver added the HoistField in the connect Source like this :

transforms=HoistField transforms.HoistField.type=org.apache.kafka.connect.transforms.HoistField$Value transforms.HoistField.field=line

Now im able to see the string as json format in the topic but now im getting in the connector jdbc sink this issue

java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:61) at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:97) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) [2021-10-08 05:52:49,638] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:456) [2021-10-08 05:52:49,639] ERROR Task sink-postgres-file-distributed-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTas

ggabmc avatar Oct 08 '21 05:10 ggabmc

That error has already been answered in this thread

https://github.com/confluentinc/kafka-connect-jdbc/issues/609#issuecomment-577300246

How about using the JDBC source connector (or Debezium) from one database to another? The problem isn't the connector itself, it's the data you're sending through Connect, and the FileStream source just isn't a good example one to use

OneCricketeer avatar Oct 08 '21 06:10 OneCricketeer

That error has already been answered in this thread

#609 (comment)

How about using the JDBC source connector (or Debezium) from one database to another? The problem isn't the connector itself, it's the data you're sending through Connect, and the FileStream source just isn't a good example one to use

Thank you so much , I changed the way I was doing it... I'm directly sending the message to the topic with the kafka-console-producer including the schema and payload and the sink is able to write those fields in the DB, thank you for your help I'm building a prove of concept to present it at work, thanks.

  kafka-console-producer.bat --bootstrap-server 127.0.0.1:9092  --topic demo-2-distributed
  {
  "schema": {
  "type": "struct", "optional": false, "version": 1, "fields": [
  { "field": "id", "type": "string", "optional": true },
  { "field": "artist", "type": "string", "optional": true },
  { "field": "song", "type": "string", "optional": true }
  ] },
  "payload": {
  "id": 1,
  "artist": "Rick Astley",
  "song": "Never Gonna Give You Up"
  }
  }

Kafka JDBC Sink Connector

{ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.password": "postgres", "topics": "demo-2-distributed", "tasks.max": "1", "key.converter.schemas.enable": "false", "fields.whitelist": "Id,artist,song", "auto.evolve": "true", "connection.user": "postgres", "value.converter.schemas.enable": "true", "auto.create": "true", "connection.url": "jdbc:postgresql://postgres:5432/postgres", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "insert.mode": "upsert", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "pk.mode": "kafka", "pk.fields": "__connect_topic,__connect_partition,__connect_offset" }

ggabmc avatar Oct 08 '21 06:10 ggabmc

Keeping the schema part of each message isn't recommended. Rather, you can use Jsonschema, Avro, or Protobuf console producers + their corresponding converters

https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

OneCricketeer avatar Oct 08 '21 12:10 OneCricketeer

@cricket007 Thanks for the suggestion on writing a custom SMT. @fabiotc here is a SMT I wrote for appending schema to a record https://github.com/yousufdev/kafka-connect-append-schema, hope it helps.

@OneCricketeer, @fabiotc can you please star my SMT repo again ? My account got compromised and deleted.Thanks

yousufdev avatar Oct 31 '21 09:10 yousufdev