spring-cloud-aws
spring-cloud-aws copied to clipboard
Add SQS Support
Resolves #344
Adds support for listening to SQS
queues and handling messages. (WIP)
Features
- See SqsIntegrationTests for usage examples and features
- Provides almost all features from the previous version and a few new ones
- Full Async support via
CompletableFuture
- Fully configurable via familiar
Spring
abstractions
Shortcomings (for now...)
- No documentation or unit tests
-
Template
not implemented - Support for
FIFO
queues not implemented - Lots of ground yet to cover 😄
I tried to follow the provided general guidelines and what I thought would make sense for the project. I'm ok with discussing and or changing any of it, both design and implementation-wise.
I'll wait for feedback before adding anything else - hopefully this will be a good enough fit for us to work together on going forward. If it doesn't look right, or perhaps another direction is preferred, no worries, we can set it aside and let the team come with something else later.
As a note, there's a messaging support
module that provides a set of abstractions for adding support to any number of messaging systems - Kinesis
comes to mind, but there sure are others in the AWS
landscape. It was largely inspired by the design of the Spring Kafka
project - that's a battle tested design that most users and engineers should be comfortable with. If that seems like an overkill for this project, it should be easy enough merging the two modules and getting rid of some abstractions.
This is a great project and I'm really thrilled to perhaps be taking a small part into building it! 😄
Thanks!
Thanks a lot @tomazfernandes🔥! I will try to review it in upcoming days. Regarding Kinesis - very likely we won't do anything about it since there is a Kinesis support in Spring Cloud Stream project, but perhaps we can use it for MQTT #38.
After very quick look - we've decided to drop @EnableXXX
annotations and support only auto-configurations. There should be still a way for users who do not rely on autoconfigurations to use SQS integration, but then they should rather manually do what auto-configuration does. With @EnableXXX
we would have to register bean definitions which causes headaches we wanted to avoid.
Since this is a critical and perhaps the most important integration we have, @MatejNedic @eddumelendez please take a look when you have time :)
Quick question @maciejwalkowiak - there seems to be an error related to setting the AWS region on the CI
build. Since this is a proverbial case of it works on my machine
😄, I wonder if it is a known issue with a known solution. If not I can look into it.
Thanks!
Sounds good! Maybe we can work on a PoC and see how it looks like.
@arunkpatra plans to take a look so let's keep him in the look.
Regarding AWS region on CI - it is somewhat on purpose. We try to leverage Localstack as much as possible and unless we hit the wall and there will be a AWS feature unsupported by Localstack all tests should run against it. (you can take a look how other integration tests are set up).
I can't wait to dig deeper into this PR!
Regarding Sonar - feel free to question what it reports - we've set up a default configuration and not everything it reports necessarily is important from our POV.
One more thing to keep in mind is that Spring Cloud AWS is used in https://github.com/spring-projects/spring-integration-aws - and while we don't need to be backward compatible - it should be possible to adjust spring-integration-aws
to the new implementation.
@arunkpatra plans to take a look so let's keep him in the look.
Sure, no hurry.
Regarding AWS region on CI - it is somewhat on purpose. We try to leverage Localstack as much as possible and unless we hit the wall and there will be a AWS feature unsupported by Localstack all tests should run against it. (you can take a look how other integration tests are set up).
Sure, I'm using LocalStack
for the tests. I guess this should be related to the changes in configuration properties you mentioned - I'll try that and update the PR.
I can't wait to dig deeper into this PR!
Cool, looking forward for your review too!
Regarding Sonar - feel free to question what it reports - we've set up a default configuration and not everything it reports necessarily is important from our POV.
I've actually enjoyed most of what it reported 😄 Of course, some less-than-critical things, but nothing to write home about.
What I didn't change was some things related to duplicated code in legacy classes and a couple Thread.sleep
in IT
s for seeing how the system performs under load - we should probably remove it afterwards.
One more thing to keep in mind is that Spring Cloud AWS is used in https://github.com/spring-projects/spring-integration-aws - and while we don't need to be backward compatible - it should be possible to adjust
spring-integration-aws
to the new implementation.
I see, maybe we'll need to make a couple adjustments, but at a first glance seems simple enough. I'll keep that in mind, and maybe we can involve them when it's appropriate.
Thanks!
Sure, I'm using
LocalStack
for the tests. I guess this should be related to the changes in configuration properties you mentioned - I'll try that and update the PR.
Well, it's been a while since I've last dealt with AWS configuration issues, and I'm not sure exactly how these properties are mapped by the framework. I'll take a better look on Monday, and also implement auto-configuration for this - probably with that we'll have a properly configured SQSAsyncClient
for the CI
tests.
In the meantime, let me know if there are any further questions or concerns about any of this.
Thanks!
@tomazfernandes I pushed fixing tests to your branch
@tomazfernandes I pushed fixing tests to your branch
Nice, thanks @maciejwalkowiak!
thanks for your contribution @tomazfernandes !
Thanks a lot for looking into this @eddumelendez!
I have left some small comments.
Sure, I'll address them, thanks.
I would be checking spring kafka during the week in order to get more familiar with it.
Sounds great! Let me know if there's anything I can help with - I've been contributing to that project for a bit more than a year now and know the codebase reasonably well at this point - and surely would enjoy discussing both that and this design.
Also, I'm not sure how familiar you are with the previous version of the SQS integration - that would be a good thing to look at as I took a few classes and code from there. Although of course we can rewrite anything we feel could be different.
A few quick notes:
- I tried to make use of what I thought was essential from the
Spring Kafka
project, but I didn't try to make an exact copy - perhaps there are some things I may have missed - I got 'creative' for the container part - there we have a large
KafkaMessageListenerContainer
class that handles most runtime logic, here I tried to break it down into smaller components, such asAck
,MessageProducer
andMessageListener
- Some of the design for this PR was constrained by the use of the
MessageHandler
interface, which was the main logic for the previous version, including annotation processing and runtime method resolving / invoking. I think we could look into decoupling those roles, separating annotation processing from method invoking.
Please let me know if there's anything else I can help with. Thanks!
I've made a few adjustments to performance and ran some preliminary tests against AWS
.
For a load of 1K
messages with a 1s
processing time (Thread.sleep
), the result was:
10.241481457 seconds for sending and consuming 1000 messages. Messages / second: 97.6421237687742
Running the same test with the current 2.4
version, the result was:
65.50966774 seconds for sending and consuming 1000 messages. Messages / second: 15.26492248394359
Of course, that difference is kind of expected given the 10 thread / queue limitation of the previous version, but still should be a happy gain for users. The performance is further improved when there's more than 1K messages in the queue e.g. from a previously interrupted test.
I think we can expect better performance from a true async
workload, e.g. the listener using other async
methods such as perhaps saving something to DynamoDB
or any async http
calls, and returning a CompletableFuture<Void>
, instead of a blocking load such as Thread.sleep
. I'll look into implementing such test.
Also, probably there's room for performance improvement in the logic itself - we can look into that in the future.
Thanks!
I think my next move here would be taking yet another page from the Spring Kafka
project and replacing the MessageHandler
interface by a BeanPostProcessor
for annotation processing / Endpoint
creation and InvocableHandlerMethod
for method invocation.
With that we'd further decouple assembly time logic from runtime logic, and also be able to handle use cases the MessageHandler
interface is not well suited to, such as async
listener methods returning a CompletableFuture<Void>
and batch processing listener methods that receive a Collection<Message>
as an argument.
And after that, add auto-configuration
with this new structure.
Since that should be quite a bit of work, I think I'll let the team catch up with what we already have first, to make sure we're all in the same page.
Thanks and please let me know if there's anything I can help with.
I think my next move here would be taking yet another page from the Spring Kafka project and replacing the MessageHandler interface by a BeanPostProcessor for annotation processing / Endpoint creation and InvocableHandlerMethod for method invocation.
With that we'd further decouple assembly time logic from runtime logic, and also be able to handle use cases the MessageHandler interface is not well suited to, such as async listener methods returning a CompletableFuture<Void> and batch processing listener methods that receive a Collection<Message> as an argument.
And after that, add auto-configuration with this new structure.
Sounds like a reasonable thing to do. Go for it 👍
Implementation looks like you're definitely know what you're doing, but I am getting lost in abstractions.
Yeah, well, I definitely mostly know what I'm doing 😄 The bits from Spring Kafka
definitely help having a solid foundation to build upon.
I am 100% confident we should continue with this design but we either have to simplify it (for example by merging these two modules) or document it.
- Simplification - I understand the reasoning for keeping them separate. The only possible implementation I can think of is AWS IOT but not sure if this will really fit.
- Documentation - at least on the Javadoc level, it must come anyways. But I think it would be really really great if you could describe either with words or a diagram how all of these pieces are meant to work together.
Sure, that was something in my mind too, specially given this is a subproject rather than an entire project on its own.
I can add some javadocs
and create some diagrams
as you suggested to help everyone navigate the code easier. I actually looked into creating the diagrams today, but failed to find a proper tool. I'll give IntelliJ's UML diagrams
another shot. Otherwise I can also write in words somewhere how the components are related - it's not that hard after you get the bigger picture.
That should give everyone a better sense on how the components are related and what role they play at assembly
and runtime
. I think then we'll be able to get a better feel of the overall complexity and if and how we should reduce it - including whether to join the modules or not.
I see that container can have single interceptor - perhaps we should consider adding more? For example, adding a possibility to measure execution time and send it via Micrometer, or adding tracing with Sleuth.
Yes, I'm aware of that. TBH, I just wanted to be able to implement the visibility extension
feature, so I just added the one 😄 I'll change that to a list of interceptors.
Please add auto-configuration (we do not need
@EnableSqs
)
Sure. This will come after the BeanPostProcessor
change, since that impacts how the feature is bootstrapped.
and tests. I imagine integration tests cover big chunk of this change and I do not mean that we have to have a unit test for each class, but I do believe some pieces here should be unit tested.
Sure, I'll implement UTs and ITs after this is a bit more mature. Same with javadocs and documentation.
@maciejwalkowiak, I've added simple javadocs to the assembly-time components just to give a jump start on this.
Below is a high-level view of the design, focusing at first at assembly-time components and flows, which I think are harder to understand. Feel free to ask if you (or anyone) have any questions or suggestions.
Afterwards we can dive into the runtime components, and some other aspects of the design.
I hope this makes sense to everyone 😄
Component types
-
Assembly-time components
are invoked during application startup to create theruntime components
. Those are the components in theconfig
andendpoint
packages, such asMessageListenerContainerFactory
andEndpoint
. -
Runtime components
- basically everything used at runtime, such as method invocation, payload conversion, and theMessageListenerContainer
with its many sub-components.
Abstraction levels
In terms of abstraction I think in three levels:
-
Interface
level - top level abstractions. As a rule of thumb, should only interact with other interfaces. -
Abstract
level - base classes that implement the interfaces. Should only interact with other abstract-level components and interfaces. -
Concrete
level - lower level abstractions that represent concrete logic for a given messaging system (SQS).
In the messaging-support
module we have the first two layers of abstraction, and in the sqs
module we have the third.
Assembly time flow
This is a bit more complicated to illustrate in writing, but hopefully it'll be helpful.
-
SqsEndpointMessageHandler
The SqsEndpointMessageHandler
bean detects @SqsListener
annotations and extracts the information that will be used to populate the SQSEndpoint
instances. It implements the EndpointRegistry
interface, which means it'll store the endpoints for later processing.
It also gathers the methods that will be invoked to handle messages at runtime.
This is the part that should be replaced by the BeanPostProcessor
approach.
-
EndpointProcessor
Then the EndpointProcessor
fetches the EndpointRegistry
bean (in our case the message handler above) and feeds the Endpoint
instances to the corresponding MessageListenerContainerFactory
instance. In this version, the Endpoint
has the factoryBeanName
field - Spring Kafka
handles it differently and more complex.
The factory creates the appropriate MessageListenerContainer
instance, which is then registered in the ListenerContainerRegistry
.
-
MessageListenerContainerFactory
The MessageListenerContainerFactory
bean name comes from the Endpoint
instance, which can either have been specified through the @SqsListener
annotation or have the default value.
Each factory will have a FactoryOptions
instance (user provided or default). The AbstractFactory
should only know about the AbstractFactoryOptions
and FactoryOptions
, so any configuration needed for the AbstractFactory
to do its work should be there. Same goes for the AbstractEndpoint
.
The factory will combine configuration from the FactoryOptions
with configuration from the Endpoint
to create a ContainerOptions
instance.
Then it creates the MessageListenerContainer
instance.
-
MessageListenerContainer
This is the class that will be used at runtime to fetch and process the actual messages. This part is more straightforward I guess. Currently, all queue
names provided in the same @SqsListener
annotations are handled by the same container, although each will have its own MessageProducer
.
Again, the AsbtractMessageListenerContainer
should only know about AbstractContainerOptions
.
So the relationship is:
Interface level: MessageListenerContainerFactory (processes) Endpoint (with) FactoryOptions (to create) MessageListenerContainer (with) ContainerOptions
Abstract level: Same but beginning with Abstract
😄
Concrete level: Same but beginning with SQS
Please let me know if all this makes it any easier to understand. I can still try to create the diagrams if that's better.
Thanks!
Thanks @tomazfernandes this definitely makes things easier. I imagine creating diagrams although useful, can take significant amount of your time so I don't want to push for that. Would you be able to post here a sample code how to create a listener programmatically? This I think would explain even better step by step the assembly line.
Thanks @tomazfernandes this definitely makes things easier. I imagine creating diagrams although useful, can take significant amount of your time so I don't want to push for that.
Yeah, given the amount of work to be done if we can get by with this that’s better 😁
Would you be able to post here a sample code how to create a listener programmatically? This I think would explain even better step by step the assembly line.
Not sure I understand what you mean - you mean creating a MessageListenerContainer without using the @SqsListener annotation?
Yes, creating a listener, start listening for messages, stop listening for messages. I think it is doable without using @SqsListener
right?
Yes, creating a listener, start listening for messages, stop listening for messages. I think it is doable without using
@SqsListener
right?
That's a good call. As is, we depend on some information gathered from the QueueAttributes
to make the container work, and the logic that retrieves that is currently coupled to the annotation processing in the MessageHandler
, so it'd be tricky to do it manually.
As part of moving things to the BeanPostProcessor
I'll keep that in mind - either dropping the requirement of having the QueueAttributes
in the container, or perhaps gathering that information from AWS on container startup instead.
Also, since the container has many subcomponents, I'll move the defaults to the ContainerOptions
so that users can easily change only the options they choose to instead of having to provide all of them.
Hey @maciejwalkowiak, and everyone! Just passing by to let you know that I’ve been working really hard on this and should have some exciting changes to push sometime next week.
I have, so far:
- Joined the modules
- Removed no longer necessary abstractions
- Thoroughly reviewed and simplified the overall design
- Reviewed and simplified generics typing
- Added integration tests with manual factory and container creation
- Implemented the basis for making it compatible with FIFO queues and ordered message processing
- Implemented multiple interceptors support
- Implemented BeanPostProcessing for annotations and flexible method invoking
- Many other small fixes and improvements
There are still some parts I want to get done before pushing the code, and some of it is still a bit rough around the edges, so I’ll keep working on it a bit further before pushing if that’s ok.
I’m really excited about this and if you liked the previous design I’m sure you’ll like it a lot better with these changes!
Please let me know if there’s something else in the meantime.
Thanks!
@tomazfernandes thanks for update! I am looking forward for the updates! Since we have just released M1, we can try to aim with SQS support for M2. It does not need to be feature complete yet - but should be enough to get feedback from the community. Thanks again for your efforts!
@tomazfernandes thanks for update! I am looking forward for the updates!
Looking forward to presenting it too!
Since we have just released M1, we can try to aim with SQS support for M2. It does not need to be feature complete yet - but should be enough to get feedback from the community.
Sure, sounds good, do you have an approx. date in mind?
Thanks again for your efforts!
No worries, thanks for the opportunity!
Ideally we will release M2 in July/August. Once CloudWatch, Dynamo, and few other improvements are done we will fire the release. If SQS is there - even better, but if not, it can also go with M3.
For a load of
1K
messages with a1s
processing time (Thread.sleep
), the result was:10.241481457 seconds for sending and consuming 1000 messages. Messages / second: 97.6421237687742
Running the same test with the current
2.4
version, the result was:65.50966774 seconds for sending and consuming 1000 messages. Messages / second: 15.26492248394359
Made some performance tweaks to the container logic and this went down (or up 😄) to
14:02:03.234 [main] INFO i.a.cloud.sqs.SqsIntegrationTests - 7.44940653 seconds for sending and consuming 1000 messages. Messages / second: 134.23888144281474
Of course, the actual figures depend on a lot of factors, and perhaps we can arrange a proper load test sometime in the future with an app running on AWS, but I think it's fair to say it's a good improvement so far!
It's great and definitely good enough to not get stuck on the performance aspect :-)
It's great and definitely good enough to not get stuck on the performance aspect :-)
Sure @maciejwalkowiak, thanks. I may have given the wrong impression on twitter - I spent only a few hours on it while reviewing some of the container logic, so IMO definitely worth the performance improvements ☺️
I'm wrapping up a few things and should push the changes later today or tomorrow, and if everything looks ok we can look into the next steps.
After that, I'm thinking of opening a few issues with what I have in mind for backlog, so we can all take a look and discuss possible solutions and priorities, how does that sound?
Thanks!
Hi, I've pushed the code as promised. Yay! I'm really confident about this design, and looking forward to your review / feedback.
The integration test suite
should do a good job showing the features in action, and I've added all class-level and method-level javadocs
. Let me know if you have any questions, and also I can probably create a few diagrams now that we have a lot less classes and packages.
There are still quite a few of shortcomings - no unit tests
, FIFO
, or autoconfiguration
for example. I should open a few issues tomorrow with what I have in mind if that's ok, so we can better align priorities and solutions. And of course feel free to open other issues too.
This is what I consider to be a first real version of the core design, so reviews are welcome @eddumelendez @MatejNedic, if you have the time.
Thanks a lot again for the opportunity! And please let me know if there's anything else.
A few interesting performance stats.
- Load test with
1K messages
and 1 secondThread.sleep
to simulate processing load.
For a load of 1K messages with a 1s processing time (Thread.sleep), the result was: 10.241481457 seconds for sending and consuming 1000 messages. Messages / second: 97.6421237687742
Running the same test with the current 2.4 version, the result was: 65.50966774 seconds for sending and consuming 1000 messages. Messages / second: 15.26492248394359
Made some performance tweaks to the container logic and this went down (or up 😄) to 14:02:03.234 [main] INFO i.a.cloud.sqs.SqsIntegrationTests - 7.44940653 seconds for sending and consuming 1000 messages. Messages / second: 134.23888144281474
After a few further adjustments, this went down to:
22:40:25.899 [main] INFO i.a.cloud.sqs.SqsIntegrationTests - 5.785801345 seconds for sending and consuming 1000 messages. Messages / second: 172.8369054468461
That's more than 10x
the current 2.4
version! 😄 I'm confident we can bring this further down in the future by at least a couple seconds by batching acknowledgements.
Again, when we're closer to release maybe it's worth doing some proper load tests with an app on AWS
to get some proper metrics, but looks like we're off to a good start!
- Improved startup time
By starting the message containers in parallel (configurable), the application startup time went down from 7.372 seconds
to 3.259 seconds
for 9 containers
in the test suite (I've removed a couple of them in the final cleanup).
- Improved test suite performance
By running the tests in parallel, along with the previous changes, the suite execution time went down from 23 seconds
to 9 seconds
.
Most of these improvements were a result of reviewing the container design for this version and making sure everything worked as expected, so it didn't really take up much time on its own 😉
Another quick note - I realize there are a lot of commits, I can squash them if you prefer. But I think a single commit would be huge to review anyway, so I thought it'd be better to review from the 'files changed' pane, and this way you can see the incremental changes if you prefer.
Thanks
I've added a few issues - there are a couple more but I think that should be enough to start with. Of course, feel free to open new ones if you'd like.
I'd appreciate any input from the team, and please let me know if you have any questions or suggestions. Also, if anyone from the team feels like getting their hands dirty, contributions are welcome ☺️
Either way I'll keep working on these. Please let me know if there's anything else I can help with.
Thanks!