mangum icon indicating copy to clipboard operation
mangum copied to clipboard

SQS message not returned back to the topic

Open jaekunchoi opened this issue 1 year ago • 1 comments

I have a below code that is a custom lambda handler

import json

from mangum.handlers.utils import (
    handle_base64_response_body,
    handle_exclude_headers,
    handle_multi_value_headers,
    maybe_encode_body,
)
from mangum.types import LambdaConfig, LambdaContext, LambdaEvent, Response, Scope


class SqsHandler:
    """This handler is responsible for reading and processing SQS events
    that have triggered the Lambda function.
    """

    def __init__(self, event: LambdaEvent, context: LambdaContext, config: LambdaConfig) -> None:
        self.event = event
        self.context = context
        self.config = config

    @classmethod
    def infer(cls, event: LambdaEvent, context: LambdaContext, config: LambdaConfig) -> bool:
        """How to distinguish SQS events from other AWS Lambda triggers"""

        return (
            "Records" in event 
            and len(event["Records"]) > 0 
            and event["Records"][0]["eventSource"] == "aws:sqs"
        )

    @property
    def body(self) -> bytes:
        """The body of the actual REST request we want to send after getting the event."""

        message_body = self.event["Records"][0]["body"]
        request_body = json.dumps({"data": message_body, "service": "sqs"})

        return maybe_encode_body(request_body, is_base64=False)

    @property
    def scope(self) -> Scope:
        """A mapping of expected keys that Mangum adapter uses under the hood"""

        headers = [{"Content-Type": "application/json"}]
        scope: Scope = {
            "type": "http",
            "http_version": "1.1",
            "method": "POST",
            "headers": [[k.encode(), v.encode()] for k, v in headers[0].items()],
            "scheme": "https",
            "path": "/content-analysis/process",
            "query_string": "",
            "raw_path": None,
            "root_path": "",
            "server": ("mangum", 80),
            "client": ("", 0),
            "asgi": {"version": "3.0", "spec_version": "2.0"},
            "aws.event": self.event,
            "aws.context": self.context,
        }
        return scope

    def __call__(self, response: Response) -> dict:
        finalized_headers, multi_value_headers = handle_multi_value_headers(response["headers"])
        finalized_body, is_base64_encoded = handle_base64_response_body(
            response["body"], finalized_headers, self.config["text_mime_types"]
        )

        return {
            "statusCode": response["status"],
            "headers": handle_exclude_headers(finalized_headers, self.config),
            "multiValueHeaders": handle_exclude_headers(multi_value_headers, self.config),
            "body": finalized_body,
            "isBase64Encoded": is_base64_encoded,
        }

and above is registered as custom handler with below code

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8008)
else:
    handler = Mangum(app, custom_handlers=[SqsHandler])

I also have SQS triggering Lambda function. When message is received from Lambda, it doesn't however put the message back into the queue. It therefore does not even retry.

Is there a reason why this is happening?

jaekunchoi avatar Oct 05 '23 13:10 jaekunchoi

"Returning" from SQS doesn't "put message back" that's not how SQL-Lambda integration works. To re-queue message you will need to call boto3 with the specific SQS handling you want. return from lambda is only indication to SQS that message was processed successfully and needs to be removed from the queue. So all the data you return there is ignored.

pkit avatar Nov 02 '23 15:11 pkit