smallrye-reactive-messaging icon indicating copy to clipboard operation
smallrye-reactive-messaging copied to clipboard

Add support for AWS SQS

Open arvidvillen opened this issue 3 years ago • 24 comments

Add support for AWS SQS: https://aws.amazon.com/sqs/

arvidvillen avatar Mar 21 '21 21:03 arvidvillen

@cescoffier

I would love to collaborate with this implementation but I have a few theoretical doubts when it comes to backpressure and implementing the incoming connector.

First a high level overview of how SQS works. SQS offers a long polling mechanism (max 20 secs), can return up to a maximum of 10 messages per poll/batch and AWS offers an async SDK. So a very simple implementation for a PublisherBuilder I came up with is to poll indefefinitely and create a multi with each of the items per batch. Note that the async SQS client from the SDK returns a completion stage:

    public PublisherBuilder<? extends Message<?>> getSource() {
        Multi<Message<?>> publisher = Uni.createFrom()
            .completionStage(() -> sqsClient.receiveMessage(m -> m.queueUrl(url)))
            .repeat().indefinitely()
            .invoke(() -> System.out.println("Log SQS invocation"))
            .onItem().transformToIterable(ReceiveMessageResponse::messages)
            .onItem().transform(Message::of);

        return ReactiveStreams.fromPublisher(publisher);
    }

However, given that this implementation would poll indefinitely, would not this defeat the purpose of backpressure? Or will this only poll as long as downstream requests for more items?

Finally, thank you very much for your time.

tcerda95 avatar Jun 03 '21 18:06 tcerda95

I don't now SQS, but Mutiny will only retry when there is request. So, I believe the back pressure will be managed.

cescoffier avatar Jun 03 '21 18:06 cescoffier

@tcerdaITBA I'd like to contribute to this as well. Have you made any progress on your side?

MihaiBogdanEugen avatar Dec 03 '21 22:12 MihaiBogdanEugen

hello, anyone still interested on working on this?

soulseekeer24 avatar Feb 03 '23 02:02 soulseekeer24

  1. Is there any way for now to use smallrye-reactive-messaging with SQS? As far as i know it does not support amazon SQS for now. Am i right?
  2. Is it possible to wrap SQS Client using JMS so i can integrate smallrye-reactive-messaging via JMS Connector?

damiankaplon avatar Feb 23 '23 14:02 damiankaplon

  1. You are right.

  2. No.

MihaiBogdanEugen avatar Feb 23 '23 16:02 MihaiBogdanEugen

anyone interested on working on adding SQS support ?XD

soulseekeer24 avatar Feb 23 '23 16:02 soulseekeer24

Hello Is there a change for this support? Maybe I can contribute somehow?

adampoplawski avatar May 07 '23 09:05 adampoplawski

Sure, a contribution would be more than welcome.

cescoffier avatar May 07 '23 17:05 cescoffier

i would like to help

soulseekeer24 avatar May 07 '23 18:05 soulseekeer24

@tcerdaITBA are you still interested? It seems you did some work towards and now it seems there are ppl willing to support.

adampoplawski avatar Jun 04 '23 05:06 adampoplawski

Hi I started with an implementation: https://github.com/holomekc/smallrye-reactive-messaging/commits/feature/AWS

I think I have no issues with the actual implementation. But there are some concepts which confuses me a little bit. So maybe you can help me to bring it into the right direction. But first the progress.

Progress: Moved to the pr: #2402

holomekc avatar Oct 01 '23 17:10 holomekc

Questions:

  1. Project structure I created an AWS module and a nested SQS module. Should I flatten the structure? I like the nested approach. What do you think

  2. OpenTelemetry TracingUtils starts and ends the trace immediately. Does not that violate the OpenTelemetry documentation?:

    • Call shouldStart(Context, Object) and do not proceed if it returns false.
    • Call start(Context, Object) at the beginning of a request.
    • Call end(Context, Object, Object, Throwable) at the end of a request.

    In SQS when I send a message I get the messageId only in the response of the SDK lib. I cannot add this information to the span. Why isn't AsyncOperationEndStrategy used? For batching it is different. There I need to generate the ids before sending. But from an implementation perspective it would be easier to do that after the response as well.

  3. Configuration For CreateQueue I want to provide the option to define attributes (Map<String, String) and tags (Map<String, String>). In case of sending messages I could use the metadata to provide this information. But this does not work for the incoming channel. The only way I found is to use the connector configuration. But Maps or Lists are not really supported. I need to parse from String manually (key1:value1,key2:value2 or value1,value2). This is something I cannot do inside of the message streams over and over again. It is a bit strange, because in microprofile config you could define maps. Quarkus also does this all the time.

    In general I think it depends on the use-case. I think for creation it is fine, because you most likely do not create that many queues. But what is the proper way to achieve something like this. I saw that sometimes beans are used to provide configuration. This is a bit strange to me

  4. Graceful Shutdown I started with a terminate method and @BeforeDestroy. I saw that in the other implementations. But this is not really graceful. I cannot simply destroy the SDK client and the channels. I need to count some things. E.g. in case 10 messages are received I need to wait for the processing and I need to wait for the deletion/confirmation. I did this already in the past in a Quarkus lib. There the approach was more clear to me. Am I allowed to block the terminate method? It could take some time until the processing is done

Thank you for some help.

holomekc avatar Oct 01 '23 17:10 holomekc

@cescoffier can we get support with code review? It seems more people want to contribute.

adampoplawski avatar Oct 21 '23 16:10 adampoplawski

Sure, @ozangunalp abd I would be happy to review the code

cescoffier avatar Oct 21 '23 17:10 cescoffier

Hello @cescoffier @ozangunalp . Sorry to ping again but maybe review was forgotten by accident :)

adampoplawski avatar Nov 12 '23 14:11 adampoplawski

@adampoplawski where is the PR?

cescoffier avatar Nov 12 '23 14:11 cescoffier

Hello @cescoffier There is branch plus questions from @holomekc how to proceed, sorry for miss information. https://github.com/holomekc/smallrye-reactive-messaging/commits/feature/AWS

adampoplawski avatar Nov 12 '23 14:11 adampoplawski

I did not create a pr yet, because I did not write tests etc. yet. As @adampoplawski mentioned I wrote some questions. I needed to chill a little bit. I was working to much. I will try to finish what I started asap. I have some freetime soon-ish

holomekc avatar Nov 12 '23 15:11 holomekc

Thanks for this. There is now a connector contribution guide if it may help you: https://smallrye.io/smallrye-reactive-messaging/4.11.0/concepts/contributing-connectors/

ozangunalp avatar Nov 13 '23 09:11 ozangunalp

I created the pr. It is not done yet, but maybe review, help etc. is easier to achieve: #2402

holomekc avatar Dec 06 '23 19:12 holomekc

Are we planning to merge it anytime soon? Eagerly awaiting this....

spc16670 avatar Jan 13 '24 17:01 spc16670

Nope. Feel free to help. There is still much to do.

holomekc avatar Jan 13 '24 19:01 holomekc

@cescoffier @ozangunalp There is new PR.

adampoplawski avatar Mar 18 '24 17:03 adampoplawski