kafka-connect-storage-common
kafka-connect-storage-common copied to clipboard
Expose `ignore.default.for.nullables`
Problem
https://github.com/confluentinc/schema-registry/pull/2326 introduced the ignore.default.for.nullables Avro converter config property. However the storage connectors currently cannot take advantage of it as it's not an exposed config. For example, when using the S3 sink connector, null values are still being replaced with defaults as detailed in this issue. Because this config is currently not exposed, ignore.default.for.nullables will always come in with the default of false:
[2024-02-08 00:58:35,672] INFO [kafka-to-s3|task-0] Creating S3 client. (io.confluent.connect.s3.storage.S3Storage:89)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] Created a retry policy for the connector (io.confluent.connect.s3.storage.S3Storage:170)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:175)
[2024-02-08 00:58:35,673] INFO [kafka-to-s3|task-0] S3 client created (io.confluent.connect.s3.storage.S3Storage:107)
[2024-02-08 00:58:42,099] INFO [kafka-to-s3|task-0] AvroDataConfig values:
allow.optional.map.keys = false
connect.meta.data = true
discard.type.doc.default = false
enhanced.avro.schema.support = true
generalized.sum.type.support = false
ignore.default.for.nullables = false
schemas.cache.config = 1000
scrub.invalid.names = false
(io.confluent.connect.avro.AvroDataConfig:369)
[2024-02-08 00:58:42,099] INFO [kafka-to-s3|task-0] Created S3 sink record writer provider. (io.confluent.connect.s3.S3SinkTask:119)
[2024-02-08 00:58:42,100] INFO [kafka-to-s3|task-0] Created S3 sink partitioner. (io.confluent.connect.s3.S3SinkTask:121)
[2024-02-08 00:58:42,100] INFO [kafka-to-s3|task-0] Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:135)
Solution
Expose the the ignore.default.for.nullables option so that it can be configured.
Does this solution apply anywhere else?
- [ ] yes
- [x] no
If yes, where?
Test Strategy
I rebuilt the kafka-connect-storage-core-11.2.4.jar with the included changes in this PR, then ran some manual test with the S3 connector to confirm that the option takes. Here's what my S3 sink settings look like:
{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"tasks.max":"1",
"errors.deadletterqueue.context.headers.enable":"true",
"errors.deadletterqueue.topic.name":"db_ingestion_dead_letter_queue",
"errors.deadletterqueue.topic.replication.factor":"1",
"filename.offset.zero.pad.widthrotate_interval_ms":"12",
"flush.size":"500000",
"locale":"en",
"partition.duration.ms":"60000",
"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'\''year'\''=YYYY/'\''month'\''=MM/'\''day'\''=dd/'\''hour'\''=HH",
"retry.backoff.ms":"5000",
"rotate.interval.ms":"15000",
"rotate.schedule.interval.ms":"60000",
"s3.bucket.name":"my-bucket",
"s3.part.size":"5242880",
"s3.region":"us-west-2",
"schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compability":"NONE ",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"timezone":"UTC",
"topics.dir":"developer/kafka-connect-avro/data/raw",
"topics.regex":"dbzium\\.inventory\\..+",
"format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://registry:8080/apis/ccompat/v7",
"key.converter.auto.registry.schemas": "true",
"key.converter.ignore.default.for.nullables": "true",
"schema.name.adjustment.mode":"avro",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://registry:8080/apis/ccompat/v7",
"value.converter.auto.registry.schemas": "true",
"value.converter.ignore.default.for.nullables": "true",
"ignore.default.for.nullables": "true"
}
After starting the connector, I see that the ignore.default.for.nullables setting was correctly applied based on the logs below:
[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] Creating S3 client. (io.confluent.connect.s3.storage.S3Storage:89)
[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] Created a retry policy for the connector (io.confluent.connect.s3.storage.S3Storage:170)
[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] Returning new credentials provider based on the configured credentials provider class (io.confluent.connect.s3.storage.S3Storage:175)
[2024-02-08 17:05:36,672] INFO [kafka-to-s3|task-0] S3 client created (io.confluent.connect.s3.storage.S3Storage:107)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] AvroDataConfig values:
allow.optional.map.keys = false
connect.meta.data = true
discard.type.doc.default = false
enhanced.avro.schema.support = true
generalized.sum.type.support = false
ignore.default.for.nullables = true
schemas.cache.config = 1000
scrub.invalid.names = false
(io.confluent.connect.avro.AvroDataConfig:369)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] Created S3 sink record writer provider. (io.confluent.connect.s3.S3SinkTask:119)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] Created S3 sink partitioner. (io.confluent.connect.s3.S3SinkTask:121)
[2024-02-08 17:05:36,921] INFO [kafka-to-s3|task-0] Started S3 connector task with assigned partitions: [] (io.confluent.connect.s3.S3SinkTask:135)
Testing done:
- [ ] Unit tests
- [ ] Integration tests
- [ ] System tests
- [x] Manual tests
Release Plan
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.
I don't understand why we need to add an option ignore.default.for.nullables here
do you know why the deser of the confluent-schema-registry lib with the option value.converter.ignore.default.for.nullables is not working ?
I do not, but I suppose it's similar to why this PR is in place to expose the scrub.invalid.names config.
The config takes and does work with producers (e.g. Debezium), but I wasn't able to get it working with the S3 sink until I introduced the changes in this PR