kafka-connect-fs
kafka-connect-fs copied to clipboard
SQS notifications not triggering
A file is uploaded to a folder in the S3 bucket s3://bucket-name/folder1/folder2/
The SQS queue gets a notification, but, the connector doesn't do anything.
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
policy.s3_event_notifications.poll=30000
errors.log.include.messages= true
policy.regexp=.*
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
policy.sleepy.sleep=60000
policy.s3_event_notifications.event_regex=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.XmlFileReader
policy.s3_event_notifications.queue=https://sqs.us-east-1.amazonaws.com/xxx/SQS-queue-name
fs.uris=s3a://bucket-name/
policy.s3_event_notifications.delete_messages=true
name=test-fs-s3
topic=test-fs-s3
policy.s3_event_notifications.max_messages=10
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
The Kafka connect is in an EC2 instance, and its role has been given permission to the SQS queue.
The setup works, as the SQS Source connector (https://www.confluent.io/hub/confluentinc/kafka-connect-sqs) works, and brings the notifications into a topic in Kafka.
Could you please let me know if there is anything off with the settings as defined above.
Thank you.
Hi @monsterdeeravi
You're setting the policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
and to process files via event notifications you have to use: policy.class=com.github.mmolimar.kafka.connect.fs.policy. S3EventNotificationsPolicy
.
Let me know if that worked
Thanks @mmolimar
Your suggestion allowed it go past the previous state. It now however comes with the error
org.apache.kafka.connect.errors.ConnectException: A problem has occurred reading configuration: com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version. (Service: AmazonSQS; Status Code: 400; Error Code: AWS.SimpleQueueService.NonExistentQueue;
The updated connector config now is:
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
policy.s3_event_notifications.poll=30000
errors.log.include.messages= true
policy.class=com.github.mmolimar.kafka.connect.fs.policy.S3EventNotificationsPolicy
policy.s3_event_notifications.event_regex=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.XmlFileReader
policy.s3_event_notifications.queue=https://sqs.us-east-1.amazonaws.com/000/SQS-queue-name
fs.uris=s3a://bucket-name/folder1/folder2/
policy.s3_event_notifications.delete_messages=false
name=test-fs-s3-1
topic=test-fs-s3-1
policy.s3_event_notifications.max_messages=10
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
I rechecked to see if the queue url is correct, and it matches what is in the SQS console as wel as what has been applied to the SQS Source Connector.
Is there some other setting that needs to be included?
Thank you.
I think policy.s3_event_notifications.queue
should be SQS-queue-name
Thanks. That change let the connector run. Unfortunately, it doesn't respond to queue events.
Does the fs.uris property do anything in this case? It does complain if I change it to s3 instead of s3a, but, looking at the code, it seems to take everything from the message, and it seems to auto add the prefix s3a://
Would it be that AWS S3 has issues connecting to it via s3a instead of s3?
You should use s3a
. More info here.