Add a spring-cloud-stream-binder for Amazon SQS
From @mstine on September 19, 2015 0:58
Seems like this should be relatively easy by leveraging the SQS support in Spring Cloud AWS. We'd like to use this to support https://github.com/spring-cloud/spring-cloud-netflix/issues/545, so that we can use SQS when running Spring Cloud Services on PWS.
Copied from original issue: spring-cloud/spring-cloud-stream#135
This would be extremely useful, especially if it could also support sending to a SNS topic and consuming from a SQS queue.
@tvrmsmith ,
Indeed, we can configure SQS queue to subscriber to the SNS topic.
But how about if we will just have a Kinesis Binder and get a gain of the whole bunch of possibilities? https://aws.amazon.com/kinesis/streams/faqs/
any update?
I would love for AWS SQS to be supported. SQS support would be a big help for our project.
I wonder why not Kinesis: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis ?
The SQS doesn't fit fully the SCSt communication model. There is no publish-subscribe semantics, no partition and offset management etc.
I would say SNS can achieve similar pub-sub semantics and combined with SQS can make the SNS topics durable, in a sense. Also, most of my use cases don't have a need for partitioning or offset management but find SCS without those to still be useful. This could be because I don't understand the best scenarios for those features though.
Edit: Sorry spoke too soon. After re-reading the SCS docs about partitioning I believe partitioning can be achieved by using separate SQS queues for each partition and subscribing the queues to the SNS topic in use.
I see your point, @tvrmsmith .
Thank you for sharing!
Unfortunately we don't have resources to jump to this area.
Contribution is welcome of course!
I've done some work on this but haven't found the time to get it into a working state.
Also, I just want to thank you guys for making Spring so awesome.
Ha-ha! No, that is big thanks to you, Community. Without your comprehensive feedback and valuable contributions the Spring wouldn't be so awesome! 😉
I've done some work putting together a project for this, but am running into an issue where the SQS API only accepts Strings for their message payloads. This would mean the default MessageConverters would have to be extended to send String representations (or keep the original String representations instead of converting to byte[]). I think this would work, but that would mean the project wouldn't conform to https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#mime-types-and-java-types which seems quite fundamental and thus maybe SQS just doesn't fit? @artembilan if you disagree I can continue down that route and finish up an initial version of this binder.
If you want to know my opinion, then I don't see reason in this binder since we have a Kinesis one. But if you insists, I can come back to think about this after Spring One.
Anyway even byte[] can be represented as a String via Base64 encoding:
java.util.Base64
...
/**
* Encodes the specified byte array into a String using the {@link Base64}
* encoding scheme.
*
* <p> This method first encodes all input bytes into a base64 encoded
* byte array and then constructs a new String by using the encoded byte
* array and the {@link java.nio.charset.StandardCharsets#ISO_8859_1
* ISO-8859-1} charset.
*
* <p> In other words, an invocation of this method has exactly the same
* effect as invoking
* {@code new String(encode(src), StandardCharsets.ISO_8859_1)}.
*
* @param src
* the byte array to encode
* @return A String containing the resulting Base64 encoded characters
*/
@SuppressWarnings("deprecation")
public String encodeToString(byte[] src) {
Right, but in the case of plain/text on the inbound channel, how do we know if the payload is the original string or a base 64 encoded string that we need to decode and convert? It could be either correct?
Shouldn't the content type be indicating to the inbound channel how to decode/parse the message?
@artembilan As far as I understand, Kinesis doesn't support consumed-once semantics.
@chrylis ,
Well, you know we ensure that with the checkpoint for each consuming shard and store an offset for the records in the DymanoDB table.
So, I believe this should somehow meet consumed-once semantics requirements.
Anyway I don't mind against the AWS SQS+SNS Binder. Just we don't have resources right now to jump into this feature. And my point is to use an existing solution as a fallback variant for time being.
Thank you for feedback!
Hi all, any updates?
I've started working on SQS binder: https://github.com/maciejwalkowiak/spring-cloud-stream-binder-sqs. Integration and most of the customisation options are missing, but the basics seem to work in simple scenarios. If someone would like to join the efforts - PRs are welcome! 😃
Looks great at a glance!
A couple remarks:
- consider to rename all the kinesis variables to sqs 😉 . That's minor
- I think you miss a big picture about consumer groups: http://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.1.RELEASE/multi/multi__main_concepts.html#consumer-groups. For that purpose I suggest to have an
SnsMessageHandleron the producer side to publish to the SNS topic instead for possible distribution between subscribed SQS queues. The SQS queue is really might be based on the provided group and subscribed to the SNS viaTopics.subscribeQueue(). Just an idea how to deal with consumer groups in Spring Cloud Stream.
Thanks so far for the effort!
Thank you @artembilan for quick feedback! Sure, there's a lot of things to be done there. Thanks for hints how to deal with customer groups.
After making it work well with SQS only I started implementing customer group support with SNS and I am having one issue perhaps @artembilan you can guide me.
I need to register two extra converters in the ConversionService (integrationConversionService) used by SnsMessageHandler. I thought putting @IntegrationConverter on the bean would do the job but seems it's not getting registered.
The code in on the branch here: https://github.com/maciejwalkowiak/spring-cloud-stream-binder-sqs/pull/8/
@maciejwalkowiak ,
first of all it is called consumer group. There is nothing to do with customers 😄
Your @IntegrationConverter is not visible as a bean for the IntegrationConverterInitializer because the @Configuration it is present is instantiated later via DefaultBinderFactory.
Anyway this is not what I would recommend to do. Such a converter in the global ConversionService is going to have an effect anywhere ConversionService is used with the byte[] and String.
To fix it properly I would suggest to have a postProcessOutputChannel() implementation with an extra ChannelInterceptor to convert incoming byte[] to String before the message reaches an SnsMessageHandler.
first of all it is called
consumer group. There is nothing to do with customers 😄
Too much business podcasts 🤦♂️
@artembilan, thank you for advice, adding interceptor to postProcessOutputChannel works like a charm!
In case you found yourself in this thread from Google, we have built Spring Cloud Stream binders for SQS and SNS. You can find them over here:
We've been using them in production without issues since about half a year now. They also support the SNS fanout pattern (i.e. you publish to SNS and have multiple queues subscribed to it, which are then consumed by another service).
This is great @mKeRix, i just added them to the front page - https://spring.io/projects/spring-cloud-stream
@mKeRix I saw you had archived the repo. Do you aware if there are any plans to impl such binder officially SCS?