kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

Avoid late records preemptively rotating/committing S3 output files

Open frankgrimes97 opened this issue 1 year ago • 3 comments

When late data is arriving on a Kafka partition (e.g. data for the previous hourly encodedPartition) the following check triggers an immediate rotation and commit of files:

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/918730d011dcd199e810ec3a68a03ab01c927f62/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java#L410

When late data is interleaved with up-to-date data arriving the problem is exacerbated. When this happens, a quick succession of rotations cause a large number of small files to be committed to S3. This affects both the performance/throughput of Kafka Connect as well as downstream consumers which need to deal with the many small file fragments.

This PR adds a new max.open.files.per.partition S3SinkConnectorConfig. It defaults to 1, which preserves the current existing behavior.

If set to a value > 1, the following behavior is enabled:

  • A separate commit file is kept open for each encodedPartition target up to a maximum of max.open.files.per.partition

  • Only when any of the encodedPartition targets hits its rotation condition (flush.size, rotate.interval.ms) does rotation occur, committing all open files. All files are committed so that S3Sink's pre-commit hook will commit a high watermark of offset to the Kafka consumer group. This avoids buffered gaps of data still being in-flight when that occurs.

It's worth noting that this issue/limitation was previously encountered and is well-described as part of: "CC-2313 Handle late arriving records in storage cloud sink connectors" https://github.com/confluentinc/kafka-connect-storage-cloud/pull/187

However, that feature was subsequently reverted: https://github.com/confluentinc/kafka-connect-storage-cloud/commit/a2ce6fc34478f3377192e8aa2d98e01db0dbf951 https://github.com/confluentinc/kafka-connect-storage-common/pull/87

N.B. Unlike the solution proposed on CC-2313, we do not opt to write late data to an incorrect encodedPartition. i.e. late data for hour 7 will not land in a path/file for hour 8

frankgrimes97 avatar Oct 21 '22 15:10 frankgrimes97

Hi, it's not clear to me that the Jenkins-public-CI integration test failures are due to my code changes. "AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;" Is that CI check broken? (I see most other outside-contributed PRs have similar failed checks)

frankgrimes97 avatar Nov 01 '22 14:11 frankgrimes97

Quick follow-up... we've been stably and successfully running this feature branch for a number of weeks now and it has improved both throughput and reduced the proliferation of unnecessarily small files when late data is being processed by the S3 Sink Connector.

We'd very much prefer to have this ultimately merged upstream to avoid needing to maintain our own fork moving forward. Is there any interest on Confluent's side to work with us to get this merged?

Cheers!

frankgrimes97 avatar Dec 08 '22 15:12 frankgrimes97

@kkonstantine I see you worked on and approved https://github.com/confluentinc/kafka-connect-storage-cloud/pull/187. Any chance you could help us get this PR reviewed and hopefully merged upstream? Thanks!

frankgrimes97 avatar Mar 28 '23 14:03 frankgrimes97