clickhouse-sink-connector
clickhouse-sink-connector copied to clipboard
Tables does not get created in clickhouse
This is my debezium config:
{
"name": "${CONNECTOR_NAME}",
"config": {
"connector.class": "${CONNECTOR_CLASS}",
"tasks.max": "1",
"snapshot.mode": "schema_only",
"snapshot.locking.mode": "none",
"snapshot.delay.ms": 10000,
"include.schema.changes":"true",
"include.schema.comments": "true",
"database.hostname": "${HOST}",
"database.port": "${PORT}",
"database.user": "${USER}",
"database.password": "${PASSWORD}",
"database.server.id": "${DATABASE_SERVER_ID}",
"database.server.name": "${DATABASE_SERVER_NAME}",
"database.whitelist": "${DBS}",
"database.allowPublicKeyRetrieval":"true",
"database.history.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
"database.history.kafka.topic": "${KAFKA_TOPIC}",
"database.ssl.mode": "required",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schemaregistry:8081",
"value.converter.schema.registry.url":"http://schemaregistry:8081",
"topic.creation.$alias.partitions": 2,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 2,
"provide.transaction.metadata": "true",
"max.batch.size": 20000,
"max.queue.size": 100000,
"max.queue.size.in.bytes": 1000000000,
"topic.prefix": "${DATABASE_SERVER_NAME}",
"schema.history.internal.kafka.topic": "${KAFKA_TOPIC}",
"schema.history.internal.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
"tombstones.on.delete": "true"
}
}
And this is my sink config:
{
"name": "${CONNECTOR_NAME}",
"config": {
"connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
"tasks.max": "3",
"topics.regex": "SERVER5432.**",
"clickhouse.server.url": "${CLICKHOUSE_HOST}",
"clickhouse.server.user": "${CLICKHOUSE_USER}",
"clickhouse.server.password": "${CLICKHOUSE_PASSWORD}",
"clickhouse.server.port": ${CLICKHOUSE_PORT},
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schemaregistry:8081",
"value.converter.schema.registry.url":"http://schemaregistry:8081",
"enable.snapshot.ddl": true,
"batch.max.records": 1000,
"sink.connector.max.queue.size": 1000,
"single.threaded": false,
"topic.creation.default.partitions": 3,
"snapshot.mode": "initial",
"max.batch.size": 1000,
"batch.size": 1000,
"metrics.enable": true,
"metrics.port": 8084,
"buffer.flush.time.ms": 1000,
"buffer.max.records": 10000,
"thread.pool.size": 3,
"fetch.min.bytes": 5242880,
"enable.kafka.offset": false,
"store.kafka.metadata": false,
"auto.create.tables": true,
"auto.create.tables.replicated": true,
"schema.evolution": true,
"replacingmergetree.delete.column": "_sign",
"treat.tombstone.as.delete": true,
"delete.enabled": true
}
}
I get the log from container that it finds the type as ARRAY and converts it. I also see the preCommit in the logs but the tables are not created in clickhouse.
My goal is to only make the tables from source (MySQL) in clickhouse. I am using altinity/clickhouse-sink-connector:2.4.0-kafka this image.
Is something missing?
I would recommend that you use the sink-connector lightweight. See for example https://github.com/Altinity/clickhouse-sink-connector/discussions/810. It works very well with MySQL including DDL