kafka-connect-storage-cloud
kafka-connect-storage-cloud copied to clipboard
Avoid late records preemptively rotating/committing S3 output files
When late data is arriving on a Kafka partition (e.g. data for the previous hourly encodedPartition) the following check triggers an immediate rotation and commit of files:
https://github.com/confluentinc/kafka-connect-storage-cloud/blob/918730d011dcd199e810ec3a68a03ab01c927f62/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java#L410
When late data is interleaved with up-to-date data arriving the problem is exacerbated. When this happens, a quick succession of rotations cause a large number of small files to be committed to S3. This affects both the performance/throughput of Kafka Connect as well as downstream consumers which need to deal with the many small file fragments.
This PR adds a new max.open.files.per.partition
S3SinkConnectorConfig. It defaults to 1, which preserves the current existing behavior.
If set to a value > 1, the following behavior is enabled:
-
A separate commit file is kept open for each encodedPartition target up to a maximum of
max.open.files.per.partition
-
Only when any of the encodedPartition targets hits its rotation condition (
flush.size
,rotate.interval.ms
) does rotation occur, committing all open files. All files are committed so that S3Sink's pre-commit hook will commit a high watermark of offset to the Kafka consumer group. This avoids buffered gaps of data still being in-flight when that occurs.
It's worth noting that this issue/limitation was previously encountered and is well-described as part of: "CC-2313 Handle late arriving records in storage cloud sink connectors" https://github.com/confluentinc/kafka-connect-storage-cloud/pull/187
However, that feature was subsequently reverted: https://github.com/confluentinc/kafka-connect-storage-cloud/commit/a2ce6fc34478f3377192e8aa2d98e01db0dbf951 https://github.com/confluentinc/kafka-connect-storage-common/pull/87
N.B. Unlike the solution proposed on CC-2313, we do not opt to write late data to an incorrect encodedPartition. i.e. late data for hour 7 will not land in a path/file for hour 8
Hi, it's not clear to me that the Jenkins-public-CI integration test failures are due to my code changes. "AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;" Is that CI check broken? (I see most other outside-contributed PRs have similar failed checks)
Quick follow-up... we've been stably and successfully running this feature branch for a number of weeks now and it has improved both throughput and reduced the proliferation of unnecessarily small files when late data is being processed by the S3 Sink Connector.
We'd very much prefer to have this ultimately merged upstream to avoid needing to maintain our own fork moving forward. Is there any interest on Confluent's side to work with us to get this merged?
Cheers!
@kkonstantine I see you worked on and approved https://github.com/confluentinc/kafka-connect-storage-cloud/pull/187. Any chance you could help us get this PR reviewed and hopefully merged upstream? Thanks!