kafka-connect-jdbc
kafka-connect-jdbc copied to clipboard
JDBC Sink fails to alter MySQL table
Streaming Avro data to MySQL with the JDBC Sink, connector aborts if switching from "pk.mode": "none"
to "pk.mode": "kafka"
with the error:
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, name='__connect_topic', isPrimaryKey=true}, as it is not optional and does not have a default value
Connector config:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "dev-dsb-errors-mysql-sink6",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "PAGEVIEWS_REGIONS",
"connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",
"auto.create": "true",
"auto.evolve":"true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"pk.mode": "kafka"
}
}'
Logs:
[2019-02-06 15:29:13,235] INFO Using MySql dialect table "PAGEVIEWS_REGIONS" present (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2019-02-06 15:29:13,236] DEBUG Querying MySql dialect column metadata for catalog:null schema:null table:PAGEVIEWS_REGIONS (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2019-02-06 15:29:13,252] INFO Setting metadata for table "PAGEVIEWS_REGIONS" to Table{name='"PAGEVIEWS_REGIONS"', columns=[Column{'GENDER', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'REGIONID', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'NUMUSERS', isPrimaryKey=false, allowsNull=true, sqlType=BIGINT}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[2019-02-06 15:29:13,253] DEBUG Found missing field: SinkRecordField{schema=Schema{STRING}, name='__connect_topic', isPrimaryKey=true} (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] DEBUG Found missing field: SinkRecordField{schema=Schema{INT32}, name='__connect_partition', isPrimaryKey=true} (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] DEBUG Found missing field: SinkRecordField{schema=Schema{INT64}, name='__connect_offset', isPrimaryKey=true} (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] INFO Unable to find fields [SinkRecordField{schema=Schema{INT32}, name='__connect_partition', isPrimaryKey=true}, SinkRecordField{schema=Schema{INT64}, name='__connect_offset', isPrimaryKey=true}, SinkRecordField{schema=Schema{STRING}, name='__connect_topic', isPrimaryKey=true}] among column names [GENDER, REGIONID, NUMUSERS] (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] ERROR WorkerSinkTask{id=dev-dsb-errors-mysql-sink5-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{INT32}, name='__connect_partition', isPrimaryKey=true}, as it is not optional and does not have a default value
at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:132)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:73)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
If the table is dropped first, the connector then succeeds, creating the table from scratch with the additional columns.
Damn it! I got this
Damn it! I got this
The problem is still present
I got this error as well. DDL on my field is not null and doesn't have default value. How to ignore or set default value for this field?
Still got this as well
still got this.
Faced the same issue today.
same is today mysql,
Still present
I am facing this as well. This seems to be a common case. I could find that this is a design choice from the docs here
For backward-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.
But, manually changing the schema is a dirty solution and is not scalable. Do we have some workaround here? Or can we expect some fixes?