kafka-connect-fs icon indicating copy to clipboard operation
kafka-connect-fs copied to clipboard

SQS notifications not triggering

Open monsterdeeravi opened this issue 3 years ago • 5 comments

A file is uploaded to a folder in the S3 bucket s3://bucket-name/folder1/folder2/

The SQS queue gets a notification, but, the connector doesn't do anything.

connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
policy.s3_event_notifications.poll=30000
errors.log.include.messages= true
policy.regexp=.*
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
policy.sleepy.sleep=60000
policy.s3_event_notifications.event_regex=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.XmlFileReader
policy.s3_event_notifications.queue=https://sqs.us-east-1.amazonaws.com/xxx/SQS-queue-name
fs.uris=s3a://bucket-name/
policy.s3_event_notifications.delete_messages=true
name=test-fs-s3
topic=test-fs-s3
policy.s3_event_notifications.max_messages=10
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter

The Kafka connect is in an EC2 instance, and its role has been given permission to the SQS queue.

The setup works, as the SQS Source connector (https://www.confluent.io/hub/confluentinc/kafka-connect-sqs) works, and brings the notifications into a topic in Kafka.

Could you please let me know if there is anything off with the settings as defined above.

Thank you.

monsterdeeravi avatar Jun 18 '21 20:06 monsterdeeravi

Hi @monsterdeeravi

You're setting the policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy and to process files via event notifications you have to use: policy.class=com.github.mmolimar.kafka.connect.fs.policy. S3EventNotificationsPolicy.

Let me know if that worked

mmolimar avatar Jun 22 '21 18:06 mmolimar

Thanks @mmolimar Your suggestion allowed it go past the previous state. It now however comes with the error org.apache.kafka.connect.errors.ConnectException: A problem has occurred reading configuration: com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version. (Service: AmazonSQS; Status Code: 400; Error Code: AWS.SimpleQueueService.NonExistentQueue;

The updated connector config now is:

connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
policy.s3_event_notifications.poll=30000
errors.log.include.messages= true
policy.class=com.github.mmolimar.kafka.connect.fs.policy.S3EventNotificationsPolicy
policy.s3_event_notifications.event_regex=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.XmlFileReader
policy.s3_event_notifications.queue=https://sqs.us-east-1.amazonaws.com/000/SQS-queue-name
fs.uris=s3a://bucket-name/folder1/folder2/
policy.s3_event_notifications.delete_messages=false
name=test-fs-s3-1
topic=test-fs-s3-1
policy.s3_event_notifications.max_messages=10
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter

I rechecked to see if the queue url is correct, and it matches what is in the SQS console as wel as what has been applied to the SQS Source Connector.

Is there some other setting that needs to be included?

Thank you.

monsterdeeravi avatar Jun 23 '21 15:06 monsterdeeravi

I think policy.s3_event_notifications.queue should be SQS-queue-name

mmolimar avatar Jun 23 '21 16:06 mmolimar

Thanks. That change let the connector run. Unfortunately, it doesn't respond to queue events.

Does the fs.uris property do anything in this case? It does complain if I change it to s3 instead of s3a, but, looking at the code, it seems to take everything from the message, and it seems to auto add the prefix s3a://

Would it be that AWS S3 has issues connecting to it via s3a instead of s3?

monsterdeeravi avatar Jun 23 '21 17:06 monsterdeeravi

You should use s3a. More info here.

mmolimar avatar Jun 26 '21 23:06 mmolimar