kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

JdbcSinkConnector (MySQL) auto.create:"true" not working

Open gregd72002 opened this issue 5 years ago • 3 comments

Hi, we observer that the sink connector sometimes creates the target table, and other time it fails to do so. The connector itself runs in distributed mode. We have 3 sink connectors (each for one topic->table). While 2 of them created the target tables the third one fails to do so. Interestingly, when we reset everything and start adding the sink connectors in a different order, the outcome will be different.


2020-02-24 10:41:44,224] INFO Starting JDBC Sink task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
[2020-02-24 10:41:44,225] INFO JdbcSinkConfig values: 
        auto.create = true
        auto.evolve = true
        batch.size = 100
        connection.password = null
        connection.url = jdbc:mysql://...
        connection.user = null
        db.timezone = UTC
        delete.enabled = false
        dialect.name = 
        fields.whitelist = []
        insert.mode = upsert
        max.retries = 10
        pk.fields = []
        pk.mode = record_key
        quote.sql.identifiers = ALWAYS
        retry.backoff.ms = 3000
        table.name.format = ${topic}
 (io.confluent.connect.jdbc.sink.JdbcSinkConfig)
[2020-02-24 10:41:44,225] INFO Initializing writer using SQL dialect: MySqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask)
[2020-02-24 10:41:44,226] INFO WorkerSinkTask{id=sink_operator-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2020-02-24 10:41:44,478] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Cluster ID: lkc-loggx (org.apache.kafka.clients.Metadata)
[2020-02-24 10:41:44,480] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Discovered group coordinator b0-pkc-l9pve.eu-west-1.aws.confluent.cloud:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-24 10:41:44,500] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-24 10:41:44,730] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-24 10:41:44,758] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Finished assignment for group at generation 1: {connector-consumer-sink_operator-0-4d32156f-e193-447b-8796-f1daeba48d90=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@1693bee8} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,793] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-24 10:41:44,795] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Adding newly assigned partitions: metadataOperator-0, metadataOperator-1, metadataOperator-2, metadataOperator-3, metadataOperator-4, metadataOperator-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,827] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,828] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,829] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,829] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,829] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:44,831] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Found no committed offset for partition metadataOperator-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-02-24 10:41:45,173] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-5 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,183] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,190] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-1 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,192] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-2 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,193] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-3 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,221] INFO [Consumer clientId=connector-consumer-sink_operator-0, groupId=connect-sink_operator] Resetting offset for partition metadataOperator-4 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2020-02-24 10:41:45,284] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
[2020-02-24 10:41:45,590] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter)
[2020-02-24 10:41:45,913] INFO Checking MySql dialect for existence of table "metadataOperator" (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2020-02-24 10:41:46,158] INFO Using MySql dialect table "metadataOperator" present (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2020-02-24 10:41:46,658] INFO Setting metadata for table "metadataOperator" to Table{name='"metadataOperator"', columns=[Column{'isActive', isPrimaryKey=false, allowsNull=true, sqlType=TINYINT}, Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=BIGINT}, Column{'name', isPrimaryKey=false, allowsNull=false, sqlType=VARCHAR}, Column{'env', isPrimaryKey=true, allowsNull=false, sqlType=VARCHAR}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[2020-02-24 10:41:47,654] WARN Write of 45 records failed, remainingRetries=10 (io.confluent.connect.jdbc.sink.JdbcSinkTask)
java.sql.BatchUpdateException: Table 'reportingdev.metadataOperator' doesn't exist
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
        at com.mysql.cj.util.Util.getInstance(Util.java:167)
        at com.mysql.cj.util.Util.getInstance(Util.java:174)
        at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:853)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435)
        at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796)
        at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:211)
        at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:177)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLSyntaxErrorException: Table 'reportingdev.metadataOperator' doesn't exist
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092)
        at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832)
        ... 17 more
[2020-02-24 10:41:47,655] INFO Closing connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider)

gregd72002 avatar Feb 24 '20 10:02 gregd72002

@gregd72002 Can you share your connector configurations? What are the names of the tables? Do they share a database and the same credentials?

gharris1727 avatar Feb 24 '20 17:02 gharris1727

{
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 1,
        "connection.url": "jdbc:mysql://...",
        "topics": "metadataRestaurant",
        "pk.mode": "record_key",
        "insert.mode": "upsert",
        "delete.enabled": false,
        "auto.create": true,
        "auto.evolve": true,
        "batch.size": 100
}

The other 2 have the same properties (same credentials) other than topics. The topics are metadataRestaurant, metadataLocation, metadataOperator.

And the actual connector configuration is set through environment variables:

CONNECT_BOOTSTRAP_SERVERS=pkc-l9pve.eu-west-1.aws.confluent.cloud:9092
CONNECT_SECURITY_PROTOCOL=SASL_SSL
CONNECT_SASL_MECHANISM=PLAIN
CONNECT_SASL_JAAS_CONFIG=...

CONNECT_PRODUCER_BOOTSTRAP_SERVERS=pkc-l9pve.eu-west-1.aws.confluent.cloud:9092
CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_PRODUCER_SASL_MECHANISM=PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG=...

CONNECT_CONSUMER_BOOTSTRAP_SERVERS=pkc-l9pve.eu-west-1.aws.confluent.cloud:9092
CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL
CONNECT_CONSUMER_SASL_MECHANISM=PLAIN
CONNECT_CONSUMER_SASL_JAAS_CONFIG=...

CONNECT_REST_ADVERTISED_HOST_NAME=localhost
CONNECT_LOG4J_ROOT_LOGLEVEL=INFO

CONNECT_GROUP_ID=connectors

CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter

CONNECT_CONFIG_STORAGE_TOPIC=connectors-config
CONNECT_OFFSET_STORAGE_TOPIC=connectors-offset
CONNECT_STATUS_STORAGE_TOPIC=connectors-status

CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars

gregd72002 avatar Feb 25 '20 08:02 gregd72002

this solved the issue for me

faizan-amjad avatar Apr 12 '20 18:04 faizan-amjad