kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

JDBC Sink fails to alter MySQL table

Open rmoff opened this issue 6 years ago • 11 comments

Streaming Avro data to MySQL with the JDBC Sink, connector aborts if switching from "pk.mode": "none" to "pk.mode": "kafka" with the error:

Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, name='__connect_topic', isPrimaryKey=true}, as it is not optional and does not have a default value

Connector config:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "dev-dsb-errors-mysql-sink6",
  "config": { 
   "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
   "tasks.max": "1",
   "topics": "PAGEVIEWS_REGIONS", 
   "connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",
   "auto.create": "true",
   "auto.evolve":"true",
   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://localhost:8081",
   "pk.mode": "kafka"
  }	   
}'

Logs:

[2019-02-06 15:29:13,235] INFO Using MySql dialect table "PAGEVIEWS_REGIONS" present (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2019-02-06 15:29:13,236] DEBUG Querying MySql dialect column metadata for catalog:null schema:null table:PAGEVIEWS_REGIONS (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2019-02-06 15:29:13,252] INFO Setting metadata for table "PAGEVIEWS_REGIONS" to Table{name='"PAGEVIEWS_REGIONS"', columns=[Column{'GENDER', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'REGIONID', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'NUMUSERS', isPrimaryKey=false, allowsNull=true, sqlType=BIGINT}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[2019-02-06 15:29:13,253] DEBUG Found missing field: SinkRecordField{schema=Schema{STRING}, name='__connect_topic', isPrimaryKey=true} (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] DEBUG Found missing field: SinkRecordField{schema=Schema{INT32}, name='__connect_partition', isPrimaryKey=true} (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] DEBUG Found missing field: SinkRecordField{schema=Schema{INT64}, name='__connect_offset', isPrimaryKey=true} (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] INFO Unable to find fields [SinkRecordField{schema=Schema{INT32}, name='__connect_partition', isPrimaryKey=true}, SinkRecordField{schema=Schema{INT64}, name='__connect_offset', isPrimaryKey=true}, SinkRecordField{schema=Schema{STRING}, name='__connect_topic', isPrimaryKey=true}] among column names [GENDER, REGIONID, NUMUSERS] (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] ERROR WorkerSinkTask{id=dev-dsb-errors-mysql-sink5-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{INT32}, name='__connect_partition', isPrimaryKey=true}, as it is not optional and does not have a default value
	at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:132)
	at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:73)
	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

If the table is dropped first, the connector then succeeds, creating the table from scratch with the additional columns.

rmoff avatar Feb 06 '19 17:02 rmoff

Damn it! I got this

blcksrx avatar May 02 '20 14:05 blcksrx

Damn it! I got this

The problem is still present

suikast42 avatar Jun 17 '20 20:06 suikast42

I got this error as well. DDL on my field is not null and doesn't have default value. How to ignore or set default value for this field?

gavnyx avatar Apr 23 '21 07:04 gavnyx

Still got this as well

fizwan avatar Apr 01 '22 09:04 fizwan

still got this.

nguyentructran avatar Sep 16 '22 01:09 nguyentructran

Faced the same issue today.

0xSumitBanik avatar Oct 13 '22 18:10 0xSumitBanik

same is today mysql,

gokhansahin477 avatar Oct 27 '22 12:10 gokhansahin477

Still present

ghkdqhrbals avatar Jan 06 '23 07:01 ghkdqhrbals

I am facing this as well. This seems to be a common case. I could find that this is a design choice from the docs here

For backward-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.

But, manually changing the schema is a dirty solution and is not scalable. Do we have some workaround here? Or can we expect some fixes?

ihsan-96 avatar Feb 22 '24 13:02 ihsan-96