camel-kafka-connector icon indicating copy to clipboard operation
camel-kafka-connector copied to clipboard

Camel AWS S3 source connector

Open ShaniAvnaim opened this issue 4 years ago • 17 comments

Hello,

I have some issue with AWS S3 Connector. Every file i saw twice (or more) in my Kafka cluster. It's the same topic, file name, Timestamp, etc. The only difference is the creation time in the Kafka.

I have additional question - in general, what is the concept of message processing guarantees of this connector?

Thank you

ShaniAvnaim avatar Oct 24 '21 14:10 ShaniAvnaim

You need to specify the option deleteAfterRead or moveAfterRead to remove entries from the bucket and avoid to consume them again

oscerd avatar Oct 24 '21 14:10 oscerd

I already use deleteAfterRead. The issue is when the connector takes a file from the bucket and inserts it more then one time to the topic.

On Sun, Oct 24, 2021 at 5:33 PM Andrea Cosentino @.***> wrote:

You need to specify the option deleteAfterRead or moveAfterRead to remove entries from the bucket and avoid to consume them again

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/apache/camel-kafka-connector/issues/1274#issuecomment-950335798, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADBAA7MLAOIKQ3OZERDSNJLUIQKMNANCNFSM5GTQA4WQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

-- Shani Harush

ShaniAvnaim avatar Oct 24 '21 14:10 ShaniAvnaim

Please report the configuration

oscerd avatar Oct 24 '21 14:10 oscerd

{ "name": "S3Connector", "config": { "connector.class": "org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "camel.source.path.bucketNameOrArn": "bucket", "camel.source.endpoint.prefix": "test", "camel.source.endpoint.region": "eu-west-1", "camel.source.endpoint.accessKey": "", "camel.source.endpoint.secretKey": "", "camel.source.endpoint.deleteAfterRead": "true", "camel.source.endpoint.includeFolders": "false", "topics": "s3" } }

On Sun, Oct 24, 2021 at 5:53 PM Andrea Cosentino @.***> wrote:

Please report the configuration

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/apache/camel-kafka-connector/issues/1274#issuecomment-950338821, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADBAA7PDUECFMGN5DM4SA7DUIQMYDANCNFSM5GTQA4WQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

-- Shani Harush

ShaniAvnaim avatar Oct 24 '21 15:10 ShaniAvnaim

How many instances?

oscerd avatar Oct 24 '21 15:10 oscerd

Just one, its a test env

On Sun, 24 Oct 2021 at 18:51 Andrea Cosentino @.***> wrote:

How many instances?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/apache/camel-kafka-connector/issues/1274#issuecomment-950348415, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADBAA7NHNFKZWSXBBNVTHEDUIQTR5ANCNFSM5GTQA4WQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

-- Shani Harush

ShaniAvnaim avatar Oct 24 '21 16:10 ShaniAvnaim

Never seen this before. What is the connector version?

Il dom 24 ott 2021, 18:10 ShaniAvnaim @.***> ha scritto:

Just one, its a test env

On Sun, 24 Oct 2021 at 18:51 Andrea Cosentino @.***> wrote:

How many instances?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub < https://github.com/apache/camel-kafka-connector/issues/1274#issuecomment-950348415 , or unsubscribe < https://github.com/notifications/unsubscribe-auth/ADBAA7NHNFKZWSXBBNVTHEDUIQTR5ANCNFSM5GTQA4WQ

. Triage notifications on the go with GitHub Mobile for iOS < https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675

or Android < https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub .

-- Shani Harush

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/apache/camel-kafka-connector/issues/1274#issuecomment-950351300, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABG6XV6P7XFIECJZJ4NXUZLUIQVYBANCNFSM5GTQA4WQ .

oscerd avatar Oct 24 '21 16:10 oscerd

The latest one from Camel site. i think it 0.11

ShaniAvnaim avatar Oct 25 '21 08:10 ShaniAvnaim

Can you show the bucket structure? What is inside? I cannot reproduce by the way.

oscerd avatar Oct 25 '21 08:10 oscerd

The bucket is very simple. I use AWS Transfer Family as SFTP Server that connect to one S3 Bucket with one folder (prefix) inside. And then the S3 Connector take the file from the bucket/folder and insert to the kafka

ShaniAvnaim avatar Oct 25 '21 09:10 ShaniAvnaim

Can you please try with includeFolder equals to true.

oscerd avatar Oct 25 '21 09:10 oscerd

I tried includeFolder=true and i got the same issue. Also, I thought it was related to uploading a file to a folder in Bucket, but i got the same issue when i uploading a file directly to /

ShaniAvnaim avatar Oct 25 '21 09:10 ShaniAvnaim

Just tried with 4 files inside a test folder, all of them have been removed and in my kafka topic I saw 4 records (a fresh kafka instance). So it's something on your side.

oscerd avatar Oct 25 '21 10:10 oscerd

Are you using schema-registry of Confluent? I saw other issue in StackOverflow that looks like mine...

ShaniAvnaim avatar Oct 25 '21 11:10 ShaniAvnaim

No. Are you running on confluent Kafka with confluent schema?

oscerd avatar Oct 25 '21 11:10 oscerd

Yes, that's what I'm trying.

ShaniAvnaim avatar Oct 25 '21 11:10 ShaniAvnaim

In this particular case I don't see anything related to schema registry in the configuration. This project is tested on Apache Kafka, not on Confluent Kafka, so I'll need some time to setup a reproducer and eventually check.

oscerd avatar Oct 25 '21 12:10 oscerd