debezium-timestamp-converter
debezium-timestamp-converter copied to clipboard
Timestamp converter is not working when use with SMT
I've to used timestamp converter with SMT "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState" but message in topic unix timestamp fields not convert to format that i want here is my example configuration of kafka connector
{ "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "converters": "timestampConverter", "auto.create.topics.enable": "false", "topic.creation.enable": "true", "topic.creation.default.partitions": "3", "topic.prefix": "compare-payment-installment-mongo", "topic.creation.default.replication.factor": "3", "topic.creation.default.compression.type": "gzip", "topic.creation.default.file.delete.delay.ms": "432000000", "topic.creation.default.cleanup.policy": "delete", "topic.creation.default.retention.ms": "432000000", "timestampConverter.debug": "false", "timestampConverter.format.date": "YYYY-MM-dd", "timestampConverter.format.datetime": "YYYY-MM-dd'T'HH:mm:ss'Z'", "timestampConverter.format.time": "HH:mm:ss", "timestampConverter.type": "oryanmoshe.kafka.connect.util.TimestampConverter", "tombstones.on.delete": "false", "mongodb.connection.string" : "", "mongodb.name": "", "mongodb.user" : "", "mongodb.password" : "", "mongodb.authSource": "", "mongodb.connection.mode": "", "database.include.list" : "", "name": "compare-payment-installment-mongo-source-connector", "collection.include.list": "", "schema.history.internal.kafka.bootstrap.servers": "kafka-0.kafka-headless.kafka-connector:9092", "schema.history.internal.kafka.topic": "compare-payment-installment-mongo-schema", "transforms": "unwrap, ReplaceField, RenameField, transform-name", "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.ReplaceField.exclude": "source", "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState", "transforms.unwrap.collection.expand.json.payload": "true", "transforms.unwrap.add.fields": "op,ts_ms", "transforms.unwrap.add.fields.prefix": "", "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", "transforms.RenameField.renames": "_id:id", "transforms.transform-name.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.transform-name.field": "ts_ms", "transforms.transform-name.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", "transforms.transform-name.target.type": "string", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "capture.mode": "change_streams_update_full_with_pre_image", "snapshot.mode": "never", "capture.scope": "database", "tasks.max": "1" } how can i solve this problem ?
Thank you