kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

S3 Sink connector overwrites partition settings

Open olivernaaris opened this issue 1 year ago • 1 comments

Hi

I'm using Debezium source and S3 sink connector for reading data from MySQL and writing it to a S3 bucket. The source connector has config to create topics automatically in Kafka with 10 partitions and replication set to 2. The problem is that S3 sink connector overwrites the partition config and the topics are created with 1 partition and 1 replica instead. When I create the topics manually and start the connectors later - the topics have the correct number of partitions. I'm unable to find the reason why the S3 sink connector would cause this.

We are using:

S3 sink connector version: 10.2.4
Debezium version: 1.8.1.Final
Strimzi 0.27.1

Our config for source and sink connector:

---
apiVersion: "kafka.strimzi.io/v1beta2"
kind: KafkaConnector
metadata:
  name: mnols-deposit-transaction-mysql-source1
  labels:
    strimzi.io/cluster: mnols-debezium-kafka
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    connector.class: "io.debezium.connector.mysql.MySqlConnector"
    database.allowPublicKeyRetrieval: "true"
    database.server.id: "84561278048"
    database.hostname: "{{ .Values.debezium.db_hostname }}"
    database.port: "3306"
    database.user: "${file:/opt/kafka/external-configuration/connector-config/UsernamePassword:mysql_username}"
    database.password: "${file:/opt/kafka/external-configuration/connector-config/UsernamePassword:mysql_password}"
    database.history.kafka.bootstrap.servers: "{{ .Values.debezium.bootstrap_servers }}:9092"
    database.history.kafka.topic: "schema_changes.deposit_service_mnols"
    database.server.name: "deposit_db_server_mnols"
    database.history.skip.unparseable.ddl: true
    database.include.list: "deposit_service"
    table.include.list: "deposit_service.deposit_transaction,deposit_service.mnols_debezium_signal"
    signal.data.collection: "deposit_service.deposit_transaction"
    database.whitelist: "deposit_service"
    topic.creation.enable: false
    auto.create.topics.enable: false
    topic.creation.default.replication.factor: 2
    topic.creation.default.partitions: 10
    topic.creation.default.cleanup.policy: "delete"
    topic.creation.default.delete.retention.ms: 86400000
    topic.creation.default.retention.ms: 604800000
    connect.timeout.ms: "120000"
    decimal.handling.mode: "string"
    time.precision.mode: "connect"
    snapshot.mode: "initial"
    snapshot.locking.mode: "none"
    enable.time.adjuster: false
    key.converter: "org.apache.kafka.connect.json.JsonConverter"
    key.converter.schemas.enable: false
    value.converter: "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
    value.converter.schemas.enable: true
    value.converter.region: "eu-central-1"
    value.converter.schemaAutoRegistrationEnabled: true
    value.converter.avroRecordType: "GENERIC_RECORD"
    value.converter.schemaName: "mnols-deposit-transaction"
    value.converter.registry.name: "{{ .Values.environment }}-glue-schema-registry"
    value.converter.enhanced.avro.schema.support: true
    value.converter.ignore.default.for.nullables: true
---
apiVersion: "kafka.strimzi.io/v1beta2"
kind: KafkaConnector
metadata:
  name: mnols-deposit-transaction-s3-sink1
  labels:
    strimzi.io/cluster: mnols-debezium-kafka
spec:
  class: io.confluent.connect.s3.S3SinkConnector
  tasksMax: 5
  config:
    name: "mnols-deposit-transaction-s3-sink1"
    connector.class: "io.confluent.connect.s3.S3SinkConnector"
    storage.class: "io.confluent.connect.s3.storage.S3Storage"
    topics: "deposit_db_server_mnols.deposit_service.deposit_transaction"
    format.class: "io.confluent.connect.s3.format.avro.AvroFormat"
    flush.size: "1000"
    s3.bucket.name: "{{ .Values.environment }}-mnols-kafka-topics"
    s3.region: "eu-central-1"
    partitioner.class: "io.confluent.connect.storage.partitioner.FieldPartitioner"
    partition.field.name: "country_code"
    partition.duration.ms: "3600000"
    topic.creation.enable: false
    auto.create.topics.enable: false
    #partitioner.class: "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"
    #path.format: "'year='yyyy/'month='MM/'day='dd/'hour='HH"
    #timestamp.extractor: "RecordField"
    #timestamp.field: "created_at"
    #timezone: "UTC"
    #locale: "en"
    transforms: unwrap,insertOffset,created_at_ts,updated_at_ts
    transforms.created_at_ts.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
    transforms.created_at_ts.field: "created_at"
    transforms.created_at_ts.format: "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"
    transforms.created_at_ts.target.type: "string"
    transforms.updated_at_ts.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
    transforms.updated_at_ts.field: "updated_at"
    transforms.updated_at_ts.format: "yyyy-MM-dd'T'HH:mm:ss.SSSSSS"
    transforms.updated_at_ts.target.type: "string"
    transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
    transforms.unwrap.drop.tombstones: true
    transforms.unwrap.delete.handling.mode: rewrite
    transforms.unwrap.add.fields: op
    transforms.insertOffset.type: org.apache.kafka.connect.transforms.InsertField$Value
    transforms.insertOffset.offset.field: __offset
    key.converter: "org.apache.kafka.connect.json.JsonConverter"
    key.converter.schemas.enable: false
    value.converter: "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
    value.converter.schemas.enable: true
    value.converter.region: "eu-central-1"
    value.converter.schemaAutoRegistrationEnabled: true
    value.converter.avroRecordType: "GENERIC_RECORD"
    value.converter.schemaName: "mnols-deposit-transaction"
    value.converter.registry.name: "{{ .Values.environment }}-glue-schema-registry"
    value.converter.enhanced.avro.schema.support: true
    value.converter.ignore.default.for.nullables: true
    schema.compatibility: "NONE"

olivernaaris avatar Apr 10 '23 15:04 olivernaaris

Would be nice if someone takes a look :)

olivernaaris avatar Jun 16 '23 18:06 olivernaaris