kafka-connect-field-and-time-partitioner
                                
                                 kafka-connect-field-and-time-partitioner copied to clipboard
                                
                                    kafka-connect-field-and-time-partitioner copied to clipboard
                            
                            
                            
                        Kafka Connect Store Partitioner by custom fields and time
Kafka Connect Field and Time Based Partitioner
Summary
- 
Partition initially by custom fields and then by time. 
- 
It extends TimeBasedPartitioner, so any existing time based partition config should be fine i.e. path.formatwill be respected.
- 
In order to make it work, set "partitioner.class"="com.canelmas.kafka.connect.FieldAndTimeBasedPartitioner"and"partition.field.name"="<comma separated custom fields in your record>"in your connector config.
- 
Set partition.field.format.path=falseif you don't want to use field labels for partitions names.{ ... "s3.bucket.name" : "data", "partition.field.name" : "appId,eventName,country", "partition.field.format.path" : true, "path.format": "'year'=YYYY/'month'=MM/'day'=dd", ... }will produce an output in the following format : /data/appId=XXXXX/eventName=YYYYYY/country=ZZ/year=2020/month=11/day=30
Example
KCONNECT_NODES=("localhost:18083" "localhost:28083" "localhost:38083")
for i in "${!KCONNECT_NODES[@]}"; do
    curl ${KCONNECT_NODES[$i]}/connectors -XPOST -H 'Content-type: application/json' -H 'Accept: application/json' -d '{
        "name": "connect-s3-sink-'$i'",
        "config": {     
            "topics": "events",
                "connector.class": "io.confluent.connect.s3.S3SinkConnector",
                "tasks.max" : 10,
                "flush.size": 50,
                "rotate.schedule.interval.ms": 600,
                "rotate.interval.ms": -1,
                "s3.part.size" : 5242880,
                "s3.region" : "us-east-1",
                "s3.bucket.name" : "playground-parquet-ingestion",        
                "topics.dir": "data",
                "storage.class" : "io.confluent.connect.s3.storage.S3Storage",        
                "partitioner.class": "com.canelmas.kafka.connect.FieldAndTimeBasedPartitioner",
                "partition.field.name" : "appId,eventName",
                "partition.duration.ms" : 86400000,
                "path.format": "'year'=YYYY/'month'=MM/'day'=dd",
                "locale" : "US",
                "timezone" : "UTC",        
                "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
                "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                "value.converter": "io.confluent.connect.avro.AvroConverter",
                "value.converter.schema.registry.url": "http://schema-registry:8081",
                "schema.compatibility": "NONE",                
                "timestamp.extractor": "RecordField",
                "timestamp.field" : "clientCreationDate",
                "parquet.codec": "snappy"                            
        }
    }'
done
Installation Guide
- Before building make sure maven and java development kit is install.
- Firstly build the package using the following command mvn package.
- After building the package a new jar will created in target/connect-fieldandtime-partitioner-1.1.0-SNAPSHOT.jarcopy the jar file into the s3 plugin directory.
- Restart the connector if the user use helm redeploy the helm so that it can detect the plugin.
Tips
Where is the plugin?
If the plugin was installed via confluent-hub the jar file should be copy to /usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/lib/ however if kafka-connect-s3-sink was installed somewhere else place the jar file in the same directory as the connector plugin jars.