kafka-connect-file-pulse
kafka-connect-file-pulse copied to clipboard
AmazonS3MoveCleanupPolicy doesn't cleanup files during KafkaConnect workers restart
Hello, I have KafkaConnect with 2 workers and FilePulse connector with 4 tasks. During the rolling restart of the workers, connector stops cleaning up files from S3. At the same time once the restart is finished, new files are processed and cleaned up successfully. All not-cleaned files have the status "COMMITTED" in the status topic. here is my connector config:
aws.s3.region: "us-east-1"
aws.s3.bucket.name: "test-xxxxx-data-us-east-1"
aws.s3.bucket.prefix: "emails/"
topic: "test-xxxxx-data"
fs.listing.class: io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing
fs.listing.interval.ms: 5000
fs.cleanup.policy.class: io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy
fs.cleanup.policy.move.success.aws.prefix.path: "success"
fs.cleanup.policy.move.failure.aws.prefix.path: "failure"
allow.tasks.reconfiguration.after.timeout.ms: 120000
tasks.reader.class: io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3BytesArrayInputReader
tasks.file.status.storage.bootstrap.servers: kafka-cluster-kafka-bootstrap:9093
tasks.file.status.storage.topic: test-xxxxx-data-status-internal
tasks.file.status.storage.topic.partitions: 10
tasks.file.status.storage.topic.replication.factor: 3
There are no errors in the logs. Just a thought, is it possible/makes sense to check file status by AmazonS3MoveCleanupPolicy retrospectively and if it has "COMMITTED" status clean it up?