s3-sqs-connector icon indicating copy to clipboard operation
s3-sqs-connector copied to clipboard

SqsClient: Unexpected error while parsing SQS message next on empty iterator

Open kartthikn opened this issue 4 years ago • 1 comments

Error while parsing SQS message

The below data has been sent to SQS queue via python boto3 library:

data = {'ticket_id': '3000613'}

# Send message to SQS queue
response = sqs.send_message(
    QueueUrl=queue_url,
    DelaySeconds=2,
    MessageAttributes={},
    MessageBody=json.dumps(data)
)

When we receive in SQS we are getting the following message:

{
   "MessageId":"95a45ab6-8a6e-4403-90e8-e3d0fb132ec3",
   "ReceiptHandle":"AQEBAw2+roaCV6U8PaNCiuV4cBHKfsb+rboAnQTekZa7CStJ5Hdbu9Nbpvvph5OqNtcupXvtuPM7OOTRXmJGrJ26+DIf/vdshZ+HIcCgqhEbaHBR4L3qQ3o+ClpwoNY0VZAB4VFQPD/mrHTUP9nAfYKGNszuU2Q1riRYgc9ClYO5KOcmdo2POWk+lrW5uDIr95lccuOmj+T0OBzy0pPxFquqOpSAbj7XyGEXRIz/ocW3MCP42WaoT4PdJAII0ylx0BYbZC5qWLqkEc1mYudgUhV1dadFM58xb6Gv71WI00V+RvaZwFbL/T19z9KqIu+Z0F7hH/Tpe15xxHpZ5yl6tSi+QAEoGMD6UshjpizspQ08Q98OEDAP0xLk0F99fC88AVcf8kJ11Icbv5raXzXnTikFxA==",
   "MD5OfBody":"ef4ab943de701a13d7e3359d3f19df98",
   "Body":"{\"ticket_id\": \"3000613\"}",
   "Attributes":{
      "SentTimestamp":"1605700176692"
   }
}

The following code was written using scala to receive the message:

 val schema = new StructType()
       .add(StructField("ticket_id", StringType))
   val fileFormat = "json"

   val inputDf = spark
     .readStream
     .format("s3-sqs")
     .schema(schema)
     .option("sqsUrl", queueUrl)
     .option("region", "eu-west-1")
     .option("awsAccessKeyId", "XXXXX")
     .option("awsSecretKey", "XXXXXXX")
     .option("fileFormat", fileFormat)
     .option("sqsFetchIntervalSeconds", "2")
     .option("useInstanceProfileCredentials", "false")
     .option("sqsLongPollingWaitTimeSeconds", "5")
     .option("maxFilesPerTrigger", "50")
     .option("ignoreFileDeletion", "true")
     .load()

   val query = inputDf.writeStream
     .queryName("sqs_records")    // this query name will be the table name
     .outputMode("append")
     .format("memory")
     .option("useInstanceProfileCredentials", "false")
     .option("region", "eu-west-1")
     .option("awsAccessKeyId", "XXXXXX")
     .option("awsSecretKey", "XXXXXXXX")
     .start()

We are not able to receive the message and getting the below error:

20/11/18 12:52:37 WARN SqsClient: Unexpected error while parsing SQS message next on empty iterator

Note: We observed that SqsClient:parseSqsMessages() method always expects to get s3 events-notifications message, else it will throw error.

kartthikn avatar Nov 18 '20 13:11 kartthikn

@kartthikn This library allows Spark SQL Streaming Applications to read files from S3 with optimized listing using SQS with the help of S3 event-based notifications. It is not meant for using SQS as a data source for Spark.

abhishekd0907 avatar Apr 04 '21 13:04 abhishekd0907