kafka-connect-file-pulse icon indicating copy to clipboard operation
kafka-connect-file-pulse copied to clipboard

Schema Registry & Amazon S3 Move Cleanup Policy Questions

Open jwhitehead77 opened this issue 2 years ago • 1 comments

I am running the latest version of the bitnami kafak docker images using docker compose. It has zookeeper, kafka, connect, and ksqldb all configured and working with other connectors so the setup is good. I also have localstack deployed in a container that mimics AWS services for my S3 bucket needs.

I needed a connector that would read files stored in AWS S3 in csv format and provide parsing, filtering, and transformations so that I can create a proper kafka message with a schema. After trying a number of connectors that just did not cut it, I finally came across File Pulse and man I was glad to find this connector as it does about 90% of what I need.

However, there are a couple of things that I have questions about because the documentation is either lacking, there seems to be a bug, or missing a feature that would be nice. I just have not been able to pinpoint which one it is yet.

First, curious as to why the schema that is created is not sent to the schema registry so that the message payload can flow through without the schema attached to each message. If the schema is in the schema registry then it can always be fetched and provided when needed. I have my connect setup to enable schemas so it seems that FilePulse does not support registering the schema...

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://schema-registry:8081

Second, when I use the AmazonS3MoveCleanupPolicy and set the fs.cleanup.policy.move.success.aws.prefix.path and fs.cleanup.policy.move.failure.aws.prefix.path options the file gets moved but no longer has a key.

The file entities.csv was placed in the root of the entity-data bucket where connector looks for files to process. This is a screenshot from localstack ui after processing the file and as you can see the the key is blank. image

Connector Config Properties

{
  "tasks.max": "1",
  "skip.headers": "1",
  "topic": "entity-data-csv",
  "aws.s3.region": "us-east-1",
  "aws.s3.bucket.name": "entity-data",
  "aws.s3.path.style.access.enabled": "true",
  "aws.s3.default.object.storage.class":"STANDARD",
  "aws.s3.service.endpoint": "http://s3.localstack:4566",
  "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
  "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing",
  "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
  "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.AmazonS3MoveCleanupPolicy",
  "fs.cleanup.policy.move.success.aws.prefix.path": "processed",
  "fs.cleanup.policy.move.failure.aws.prefix.path": "failed",
  "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3RowFileInputReader",
  "tasks.file.status.storage.bootstrap.servers": "kafka:9092",
  "filters": "parseRow,setKey",
  "filters.parseRow.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
  "filters.parseRow.extract.column.name": "headers",
  "filters.parseRow.trim.column": "true",
  "filters.setKey.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
  "filters.setKey.field": "$key",
  "filters.setKey.value": "$value.ssn",
  "file.filter.regex.pattern": ".*\\.csv$",
  "offset.attributes.string": "name+hash"
}

This is a very awesome connector and it saved me a lot of coding to create my own.

Looking forward to your response.

Merry Christmas and Happy New Year Jason Whitehead

jwhitehead77 avatar Dec 24 '23 23:12 jwhitehead77

Been debugging the issue with the cleanup policy you described. Best I can tell, it works as expected only when the S3 object has a leading slash. So entities.csv results in the behavior you're seeing, but /entities.csv works fine. I think I tracked it down to a location in the code where the assumed leading slash is being parsed out, or returning an empty string when not found. I agree this is unintuitive and confusing behavior.

https://github.com/streamthoughts/kafka-connect-file-pulse/blob/cbf4a2015ea71c51075cf2b55a609b9f149dc415/connect-file-pulse-filesystems/filepulse-amazons3-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/S3BucketKey.java#L72

sblair-metrostar avatar Sep 20 '24 17:09 sblair-metrostar