s3-sqs-connector
s3-sqs-connector copied to clipboard
SqsClient: Unexpected error while parsing SQS message next on empty iterator
Error while parsing SQS message
The below data has been sent to SQS queue via python boto3 library:
data = {'ticket_id': '3000613'}
# Send message to SQS queue
response = sqs.send_message(
QueueUrl=queue_url,
DelaySeconds=2,
MessageAttributes={},
MessageBody=json.dumps(data)
)
When we receive in SQS we are getting the following message:
{
"MessageId":"95a45ab6-8a6e-4403-90e8-e3d0fb132ec3",
"ReceiptHandle":"AQEBAw2+roaCV6U8PaNCiuV4cBHKfsb+rboAnQTekZa7CStJ5Hdbu9Nbpvvph5OqNtcupXvtuPM7OOTRXmJGrJ26+DIf/vdshZ+HIcCgqhEbaHBR4L3qQ3o+ClpwoNY0VZAB4VFQPD/mrHTUP9nAfYKGNszuU2Q1riRYgc9ClYO5KOcmdo2POWk+lrW5uDIr95lccuOmj+T0OBzy0pPxFquqOpSAbj7XyGEXRIz/ocW3MCP42WaoT4PdJAII0ylx0BYbZC5qWLqkEc1mYudgUhV1dadFM58xb6Gv71WI00V+RvaZwFbL/T19z9KqIu+Z0F7hH/Tpe15xxHpZ5yl6tSi+QAEoGMD6UshjpizspQ08Q98OEDAP0xLk0F99fC88AVcf8kJ11Icbv5raXzXnTikFxA==",
"MD5OfBody":"ef4ab943de701a13d7e3359d3f19df98",
"Body":"{\"ticket_id\": \"3000613\"}",
"Attributes":{
"SentTimestamp":"1605700176692"
}
}
The following code was written using scala to receive the message:
val schema = new StructType()
.add(StructField("ticket_id", StringType))
val fileFormat = "json"
val inputDf = spark
.readStream
.format("s3-sqs")
.schema(schema)
.option("sqsUrl", queueUrl)
.option("region", "eu-west-1")
.option("awsAccessKeyId", "XXXXX")
.option("awsSecretKey", "XXXXXXX")
.option("fileFormat", fileFormat)
.option("sqsFetchIntervalSeconds", "2")
.option("useInstanceProfileCredentials", "false")
.option("sqsLongPollingWaitTimeSeconds", "5")
.option("maxFilesPerTrigger", "50")
.option("ignoreFileDeletion", "true")
.load()
val query = inputDf.writeStream
.queryName("sqs_records") // this query name will be the table name
.outputMode("append")
.format("memory")
.option("useInstanceProfileCredentials", "false")
.option("region", "eu-west-1")
.option("awsAccessKeyId", "XXXXXX")
.option("awsSecretKey", "XXXXXXXX")
.start()
We are not able to receive the message and getting the below error:
20/11/18 12:52:37 WARN SqsClient: Unexpected error while parsing SQS message next on empty iterator
Note: We observed that SqsClient:parseSqsMessages() method always expects to get s3 events-notifications message, else it will throw error.
@kartthikn This library allows Spark SQL Streaming Applications to read files from S3 with optimized listing using SQS with the help of S3 event-based notifications. It is not meant for using SQS as a data source for Spark.