kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
S3 Sink connector overwrites partition settings
Hi
I'm using Debezium source and S3 sink connector for reading data from MySQL and writing it to a S3 bucket. The source connector has config to create topics automatically in Kafka with 10 partitions and replication set to 2. The problem is that S3 sink connector overwrites the partition config and the topics are created with 1 partition and 1 replica instead. When I create the topics manually and start the connectors later - the topics have the correct number of partitions. I'm unable to find the reason why the S3 sink connector would cause this.
We are using:
S3 sink connector version: 10.2.4
Debezium version: 1.8.1.Final
Strimzi 0.27.1
Our config for source and sink connector:
---
apiVersion: "kafka.strimzi.io/v1beta2"
kind: KafkaConnector
metadata:
name: mnols-deposit-transaction-mysql-source1
labels:
strimzi.io/cluster: mnols-debezium-kafka
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
connector.class: "io.debezium.connector.mysql.MySqlConnector"
database.allowPublicKeyRetrieval: "true"
database.server.id: "84561278048"
database.hostname: "{{ .Values.debezium.db_hostname }}"
database.port: "3306"
database.user: "${file:/opt/kafka/external-configuration/connector-config/UsernamePassword:mysql_username}"
database.password: "${file:/opt/kafka/external-configuration/connector-config/UsernamePassword:mysql_password}"
database.history.kafka.bootstrap.servers: "{{ .Values.debezium.bootstrap_servers }}:9092"
database.history.kafka.topic: "schema_changes.deposit_service_mnols"
database.server.name: "deposit_db_server_mnols"
database.history.skip.unparseable.ddl: true
database.include.list: "deposit_service"
table.include.list: "deposit_service.deposit_transaction,deposit_service.mnols_debezium_signal"
signal.data.collection: "deposit_service.deposit_transaction"
database.whitelist: "deposit_service"
topic.creation.enable: false
auto.create.topics.enable: false
topic.creation.default.replication.factor: 2
topic.creation.default.partitions: 10
topic.creation.default.cleanup.policy: "delete"
topic.creation.default.delete.retention.ms: 86400000
topic.creation.default.retention.ms: 604800000
connect.timeout.ms: "120000"
decimal.handling.mode: "string"
time.precision.mode: "connect"
snapshot.mode: "initial"
snapshot.locking.mode: "none"
enable.time.adjuster: false
key.converter: "org.apache.kafka.connect.json.JsonConverter"
key.converter.schemas.enable: false
value.converter: "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
value.converter.schemas.enable: true
value.converter.region: "eu-central-1"
value.converter.schemaAutoRegistrationEnabled: true
value.converter.avroRecordType: "GENERIC_RECORD"
value.converter.schemaName: "mnols-deposit-transaction"
value.converter.registry.name: "{{ .Values.environment }}-glue-schema-registry"
value.converter.enhanced.avro.schema.support: true
value.converter.ignore.default.for.nullables: true
---
apiVersion: "kafka.strimzi.io/v1beta2"
kind: KafkaConnector
metadata:
name: mnols-deposit-transaction-s3-sink1
labels:
strimzi.io/cluster: mnols-debezium-kafka
spec:
class: io.confluent.connect.s3.S3SinkConnector
tasksMax: 5
config:
name: "mnols-deposit-transaction-s3-sink1"
connector.class: "io.confluent.connect.s3.S3SinkConnector"
storage.class: "io.confluent.connect.s3.storage.S3Storage"
topics: "deposit_db_server_mnols.deposit_service.deposit_transaction"
format.class: "io.confluent.connect.s3.format.avro.AvroFormat"
flush.size: "1000"
s3.bucket.name: "{{ .Values.environment }}-mnols-kafka-topics"
s3.region: "eu-central-1"
partitioner.class: "io.confluent.connect.storage.partitioner.FieldPartitioner"
partition.field.name: "country_code"
partition.duration.ms: "3600000"
topic.creation.enable: false
auto.create.topics.enable: false
#partitioner.class: "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"
#path.format: "'year='yyyy/'month='MM/'day='dd/'hour='HH"
#timestamp.extractor: "RecordField"
#timestamp.field: "created_at"
#timezone: "UTC"
#locale: "en"
transforms: unwrap,insertOffset,created_at_ts,updated_at_ts
transforms.created_at_ts.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.created_at_ts.field: "created_at"
transforms.created_at_ts.format: "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"
transforms.created_at_ts.target.type: "string"
transforms.updated_at_ts.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.updated_at_ts.field: "updated_at"
transforms.updated_at_ts.format: "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"
transforms.updated_at_ts.target.type: "string"
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: true
transforms.unwrap.delete.handling.mode: rewrite
transforms.unwrap.add.fields: op
transforms.insertOffset.type: org.apache.kafka.connect.transforms.InsertField$Value
transforms.insertOffset.offset.field: __offset
key.converter: "org.apache.kafka.connect.json.JsonConverter"
key.converter.schemas.enable: false
value.converter: "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
value.converter.schemas.enable: true
value.converter.region: "eu-central-1"
value.converter.schemaAutoRegistrationEnabled: true
value.converter.avroRecordType: "GENERIC_RECORD"
value.converter.schemaName: "mnols-deposit-transaction"
value.converter.registry.name: "{{ .Values.environment }}-glue-schema-registry"
value.converter.enhanced.avro.schema.support: true
value.converter.ignore.default.for.nullables: true
schema.compatibility: "NONE"
Would be nice if someone takes a look :)