kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

Small Files when encoded partition changes from record to record

Open piyuverma opened this issue 5 years ago • 3 comments

There is a line of code in file https://github.com/confluentinc/kafka-connect-storage-cloud/blob/5.5.x/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java on line number 360-364, boolean periodicRotation = rotateIntervalMs > 0 && timestampExtractor != null && ( recordTimestamp - baseRecordTimestamp >= rotateIntervalMs || !**encodedPartition.equals(currentEncodedPartition)** Due to above highlighted check, it is comparing last encodedPartition with the encodedPartition of current record and on mismatch, rotating the older file. And This is causing very tiny files in the output if there are records with different encoded partitions interleaved on the kafka topic partition task is listening to.

My question is: What is the need of this OR condition i.e. !encodedPartition.equals(currentEncodedPartition) ? Is there any specific mandatory reason/check which i am not able to understand here.

To further check the behaviour, What i did was, i created a branch and removed this condition and modified lines 360-364 to look something like below: boolean periodicRotation = rotateIntervalMs > 0 && timestampExtractor != null && ( recordTimestamp - baseRecordTimestamp >= rotateIntervalMs and started the connector with records(of different encoded partitions) interleaved on the same topic-partiton and as per my assumption it worked perfectly fine and created lesser files now (files created were equal to different encoded partitions created by the records within the rotate.interval.ms) and each file contained their respective records which were producing same encoded partition.

piyuverma avatar Dec 02 '20 13:12 piyuverma

We have a custom partitioner. In addition to date it also adds value of the record field as partition, like so : category=string/date=24-04-2-21. Connector consumes topic records with different category values. When using rotate.schedule.interval.ms everything works as expected. When i set rotate.interval.ms, s3 bucket is flooded with small files. Logs show that file is closed once partitioned field value changes in the next offset. Files end up being 1-2 records long. This is quite an unexpected behavior. Having separate topics and connectors per category is impractical to us. Is it possible to have exactly-once delivery and custom partitioner like that?

maxstepanov avatar Mar 24 '21 14:03 maxstepanov

It happened with our team as well. We used a timestamp value defined in RecordField when this happened. We ended up switching to the Record timestamp because we realized we wouldn't be getting exactly-once delivery otherwise, as the documentation explains.

However, my colleague tried to set up his connector using the same strategy above and again had the problem.

paulochf avatar Jan 11 '22 18:01 paulochf

I think I have the same issue, we have a custom partitioner that writes files in a path like <env>/<topic>/machine-<id>/year-2022/month-2/day-8/hour-22/<file>+<partition>+<topic>.av and:

  • if I set rotate.schedule.interval.ms it tries to open too many writers and crashes because of the heap (maybe because of the number of machines?)
  • if I set rotate.interval.ms it opens a file, writes a few records, commits, switches to another machine/topic, writes a few records, commits and we end up with tens of files within each hour

We don't set timestamp.extractor so it should be deterministic since it defaults to Wallclock.

Update: using "timestamp.extractor": "Record" gives the same result (the partition timestamp changes obviously)

What would be nice is having one writer per machine per topic per partition so that the writer doesn't have to commit a file each time a data from another machine comes in the topic (and write/commit a file with a couple records) because since we receive data from multiple machines at a time we can potentially have 1 message per machine and then another machine

alex88 avatar Feb 08 '22 22:02 alex88