kafka-connect-jdbc
kafka-connect-jdbc copied to clipboard
JdbcSinkConnector (MySQL) auto.create:"true" not working
Hi, we observer that the sink connector sometimes creates the target table, and other time it fails to do so. The connector itself runs in distributed mode. We have 3 sink connectors (each for one topic->table). While 2 of them created the target tables the third one fails to do so. Interestingly, when we reset everything and start adding the sink connectors in a different order, the outcome will be different.
2020-02-24 10:41:44,224] INFO Starting JDBC Sink task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
[2020-02-24 10:41:44,225] INFO JdbcSinkConfig values:
auto.create = true
auto.evolve = true
batch.size = 100
connection.password = null
connection.url = jdbc:mysql://...
connection.user = null
db.timezone = UTC
delete.enabled = false
dialect.name =
fields.whitelist = []
insert.mode = upsert
max.retries = 10
pk.fields = []
pk.mode = record_key
quote.sql.identifiers = ALWAYS
retry.backoff.ms = 3000
table.name.format = ${topic}
(io.confluent.connect.jdbc.sink.JdbcSinkConfig)
[2020-02-24 10:41:44,225] INFO Initializing writer using SQL dialect: MySqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask)
[2020-02-24 10:41:44,226] INFO WorkerSinkTask{id=sink_operator-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2020-02-24 10:41:44,478] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Cluster ID: lkc-loggx (org.apache.kafka.clients.Metadata)
[2020-02-24 10:41:44,480] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Discovered group coordinator b0-pkc-l9pve.eu-west-1.aws.confluent.cloud:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-24 10:41:44,500] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-24 10:41:44,730] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-24 10:41:44,758] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Finished assignment for group at generation 1: {connector-consumer-sink_operator-0-4d32156f-e193-447b-8796-f1daeba48d90=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@1693bee8} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,793] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-24 10:41:44,795] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Adding newly assigned partitions: metadataOperator-0, metadataOperator-1, metadataOperator-2, metadataOperator-3, metadataOperator-4, metadataOperator-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,827] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,828] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,829] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,829] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,829] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,831] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:45,173] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-5 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,183] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,190] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-1 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,192] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-2 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,193] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-3 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,221] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-4 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,284] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
[2020-02-24 10:41:45,590] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter)
[2020-02-24 10:41:45,913] INFO Checking MySql dialect for existence of table "metadataOperator" (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2020-02-24 10:41:46,158] INFO Using MySql dialect table "metadataOperator" present (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2020-02-24 10:41:46,658] INFO Setting metadata for table "metadataOperator" to Table{name='"metadataOperator"', columns=[Column{'isActive', isPrimaryKey=false, allowsNull=true, sqlType=TINYINT}, Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=BIGINT}, Column{'name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'env', isPrimaryKey=true, allowsNull=false, sqlType=VARCHAR}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[2020-02-24 10:41:47,654] WARN Write of 45 records failed, remainingRetries=10 (io.confluent.connect.jdbc.sink.JdbcSinkTask)
java.sql.BatchUpdateException: Table 'reportingdev.metadataOperator' doesn't exist
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
at com.mysql.cj.util.Util.getInstance(Util.java:167)
at com.mysql.cj.util.Util.getInstance(Util.java:174)
at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:853)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435)
at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796)
at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:211)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:177)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLSyntaxErrorException: Table 'reportingdev.metadataOperator' doesn't exist
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832)
... 17 more
[2020-02-24 10:41:47,655] INFO Closing connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
@gregd72002 Can you share your connector configurations? What are the names of the tables? Do they share a database and the same credentials?
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"connection.url": "jdbc:mysql://...",
"topics": "metadataRestaurant",
"pk.mode": "record_key",
"insert.mode": "upsert",
"delete.enabled": false,
"auto.create": true,
"auto.evolve": true,
"batch.size": 100
}
The other 2 have the same properties (same credentials) other than topics
. The topics are metadataRestaurant
, metadataLocation
, metadataOperator
.
And the actual connector configuration is set through environment variables:
CONNECT_BOOTSTRAP_SERVERS=pkc-l9pve.eu-west-1.aws.confluent.cloud:9092
CONNECT_SECURITY_PROTOCOL=SASL_SSL
CONNECT_SASL_MECHANISM=PLAIN
CONNECT_SASL_JAAS_CONFIG=...
CONNECT_PRODUCER_BOOTSTRAP_SERVERS=pkc-l9pve.eu-west-1.aws.confluent.cloud:9092
CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG=...
CONNECT_CONSUMER_BOOTSTRAP_SERVERS=pkc-l9pve.eu-west-1.aws.confluent.cloud:9092
CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
CONNECT_CONSUMER_SASL_JAAS_CONFIG=...
CONNECT_REST_ADVERTISED_HOST_NAME=localhost
CONNECT_LOG4J_ROOT_LOGLEVEL=INFO
CONNECT_GROUP_ID=connectors
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_CONFIG_STORAGE_TOPIC=connectors-config
CONNECT_OFFSET_STORAGE_TOPIC=connectors-offset
CONNECT_STATUS_STORAGE_TOPIC=connectors-status
CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars
this solved the issue for me