spring-cloud-aws icon indicating copy to clipboard operation
spring-cloud-aws copied to clipboard

Add SQS Support

Open tomazfernandes opened this issue 2 years ago • 46 comments

Resolves #344

Adds support for listening to SQS queues and handling messages. (WIP)

Features

  • See SqsIntegrationTests for usage examples and features
  • Provides almost all features from the previous version and a few new ones
  • Full Async support via CompletableFuture
  • Fully configurable via familiar Spring abstractions

Shortcomings (for now...)

  • No documentation or unit tests
  • Template not implemented
  • Support for FIFO queues not implemented
  • Lots of ground yet to cover 😄

I tried to follow the provided general guidelines and what I thought would make sense for the project. I'm ok with discussing and or changing any of it, both design and implementation-wise.

I'll wait for feedback before adding anything else - hopefully this will be a good enough fit for us to work together on going forward. If it doesn't look right, or perhaps another direction is preferred, no worries, we can set it aside and let the team come with something else later.

As a note, there's a messaging support module that provides a set of abstractions for adding support to any number of messaging systems - Kinesis comes to mind, but there sure are others in the AWS landscape. It was largely inspired by the design of the Spring Kafka project - that's a battle tested design that most users and engineers should be comfortable with. If that seems like an overkill for this project, it should be easy enough merging the two modules and getting rid of some abstractions.

This is a great project and I'm really thrilled to perhaps be taking a small part into building it! 😄

Thanks!

tomazfernandes avatar May 06 '22 06:05 tomazfernandes

Thanks a lot @tomazfernandes🔥! I will try to review it in upcoming days. Regarding Kinesis - very likely we won't do anything about it since there is a Kinesis support in Spring Cloud Stream project, but perhaps we can use it for MQTT #38.

After very quick look - we've decided to drop @EnableXXX annotations and support only auto-configurations. There should be still a way for users who do not rely on autoconfigurations to use SQS integration, but then they should rather manually do what auto-configuration does. With @EnableXXX we would have to register bean definitions which causes headaches we wanted to avoid.

Since this is a critical and perhaps the most important integration we have, @MatejNedic @eddumelendez please take a look when you have time :)

maciejwalkowiak avatar May 06 '22 08:05 maciejwalkowiak

Quick question @maciejwalkowiak - there seems to be an error related to setting the AWS region on the CI build. Since this is a proverbial case of it works on my machine 😄, I wonder if it is a known issue with a known solution. If not I can look into it.

Thanks!

tomazfernandes avatar May 06 '22 16:05 tomazfernandes

Sounds good! Maybe we can work on a PoC and see how it looks like.

@arunkpatra plans to take a look so let's keep him in the look.

Regarding AWS region on CI - it is somewhat on purpose. We try to leverage Localstack as much as possible and unless we hit the wall and there will be a AWS feature unsupported by Localstack all tests should run against it. (you can take a look how other integration tests are set up).

I can't wait to dig deeper into this PR!

Regarding Sonar - feel free to question what it reports - we've set up a default configuration and not everything it reports necessarily is important from our POV.

One more thing to keep in mind is that Spring Cloud AWS is used in https://github.com/spring-projects/spring-integration-aws - and while we don't need to be backward compatible - it should be possible to adjust spring-integration-aws to the new implementation.

maciejwalkowiak avatar May 06 '22 17:05 maciejwalkowiak

@arunkpatra plans to take a look so let's keep him in the look.

Sure, no hurry.

Regarding AWS region on CI - it is somewhat on purpose. We try to leverage Localstack as much as possible and unless we hit the wall and there will be a AWS feature unsupported by Localstack all tests should run against it. (you can take a look how other integration tests are set up).

Sure, I'm using LocalStack for the tests. I guess this should be related to the changes in configuration properties you mentioned - I'll try that and update the PR.

I can't wait to dig deeper into this PR!

Cool, looking forward for your review too!

Regarding Sonar - feel free to question what it reports - we've set up a default configuration and not everything it reports necessarily is important from our POV.

I've actually enjoyed most of what it reported 😄 Of course, some less-than-critical things, but nothing to write home about. What I didn't change was some things related to duplicated code in legacy classes and a couple Thread.sleep in ITs for seeing how the system performs under load - we should probably remove it afterwards.

One more thing to keep in mind is that Spring Cloud AWS is used in https://github.com/spring-projects/spring-integration-aws - and while we don't need to be backward compatible - it should be possible to adjust spring-integration-aws to the new implementation.

I see, maybe we'll need to make a couple adjustments, but at a first glance seems simple enough. I'll keep that in mind, and maybe we can involve them when it's appropriate.

Thanks!

tomazfernandes avatar May 06 '22 17:05 tomazfernandes

Sure, I'm using LocalStack for the tests. I guess this should be related to the changes in configuration properties you mentioned - I'll try that and update the PR.

Well, it's been a while since I've last dealt with AWS configuration issues, and I'm not sure exactly how these properties are mapped by the framework. I'll take a better look on Monday, and also implement auto-configuration for this - probably with that we'll have a properly configured SQSAsyncClient for the CI tests.

In the meantime, let me know if there are any further questions or concerns about any of this.

Thanks!

tomazfernandes avatar May 06 '22 19:05 tomazfernandes

@tomazfernandes I pushed fixing tests to your branch

maciejwalkowiak avatar May 06 '22 20:05 maciejwalkowiak

@tomazfernandes I pushed fixing tests to your branch

Nice, thanks @maciejwalkowiak!

tomazfernandes avatar May 06 '22 20:05 tomazfernandes

thanks for your contribution @tomazfernandes !

Thanks a lot for looking into this @eddumelendez!

I have left some small comments.

Sure, I'll address them, thanks.

I would be checking spring kafka during the week in order to get more familiar with it.

Sounds great! Let me know if there's anything I can help with - I've been contributing to that project for a bit more than a year now and know the codebase reasonably well at this point - and surely would enjoy discussing both that and this design.

Also, I'm not sure how familiar you are with the previous version of the SQS integration - that would be a good thing to look at as I took a few classes and code from there. Although of course we can rewrite anything we feel could be different.

A few quick notes:

  • I tried to make use of what I thought was essential from the Spring Kafka project, but I didn't try to make an exact copy - perhaps there are some things I may have missed
  • I got 'creative' for the container part - there we have a large KafkaMessageListenerContainer class that handles most runtime logic, here I tried to break it down into smaller components, such as Ack, MessageProducer and MessageListener
  • Some of the design for this PR was constrained by the use of the MessageHandler interface, which was the main logic for the previous version, including annotation processing and runtime method resolving / invoking. I think we could look into decoupling those roles, separating annotation processing from method invoking.

Please let me know if there's anything else I can help with. Thanks!

tomazfernandes avatar May 09 '22 20:05 tomazfernandes

I've made a few adjustments to performance and ran some preliminary tests against AWS.

For a load of 1K messages with a 1s processing time (Thread.sleep), the result was: 10.241481457 seconds for sending and consuming 1000 messages. Messages / second: 97.6421237687742

Running the same test with the current 2.4 version, the result was: 65.50966774 seconds for sending and consuming 1000 messages. Messages / second: 15.26492248394359

Of course, that difference is kind of expected given the 10 thread / queue limitation of the previous version, but still should be a happy gain for users. The performance is further improved when there's more than 1K messages in the queue e.g. from a previously interrupted test.

I think we can expect better performance from a true async workload, e.g. the listener using other async methods such as perhaps saving something to DynamoDB or any async http calls, and returning a CompletableFuture<Void>, instead of a blocking load such as Thread.sleep. I'll look into implementing such test.

Also, probably there's room for performance improvement in the logic itself - we can look into that in the future.

Thanks!

tomazfernandes avatar May 10 '22 05:05 tomazfernandes

I think my next move here would be taking yet another page from the Spring Kafka project and replacing the MessageHandler interface by a BeanPostProcessor for annotation processing / Endpoint creation and InvocableHandlerMethod for method invocation.

With that we'd further decouple assembly time logic from runtime logic, and also be able to handle use cases the MessageHandler interface is not well suited to, such as async listener methods returning a CompletableFuture<Void> and batch processing listener methods that receive a Collection<Message> as an argument.

And after that, add auto-configuration with this new structure.

Since that should be quite a bit of work, I think I'll let the team catch up with what we already have first, to make sure we're all in the same page.

Thanks and please let me know if there's anything I can help with.

tomazfernandes avatar May 10 '22 23:05 tomazfernandes

I think my next move here would be taking yet another page from the Spring Kafka project and replacing the MessageHandler interface by a BeanPostProcessor for annotation processing / Endpoint creation and InvocableHandlerMethod for method invocation.

With that we'd further decouple assembly time logic from runtime logic, and also be able to handle use cases the MessageHandler interface is not well suited to, such as async listener methods returning a CompletableFuture<Void> and batch processing listener methods that receive a Collection<Message> as an argument.

And after that, add auto-configuration with this new structure.

Sounds like a reasonable thing to do. Go for it 👍

maciejwalkowiak avatar Jun 02 '22 21:06 maciejwalkowiak

Implementation looks like you're definitely know what you're doing, but I am getting lost in abstractions.

Yeah, well, I definitely mostly know what I'm doing 😄 The bits from Spring Kafka definitely help having a solid foundation to build upon.

I am 100% confident we should continue with this design but we either have to simplify it (for example by merging these two modules) or document it.

  1. Simplification - I understand the reasoning for keeping them separate. The only possible implementation I can think of is AWS IOT but not sure if this will really fit.
  2. Documentation - at least on the Javadoc level, it must come anyways. But I think it would be really really great if you could describe either with words or a diagram how all of these pieces are meant to work together.

Sure, that was something in my mind too, specially given this is a subproject rather than an entire project on its own.

I can add some javadocs and create some diagrams as you suggested to help everyone navigate the code easier. I actually looked into creating the diagrams today, but failed to find a proper tool. I'll give IntelliJ's UML diagrams another shot. Otherwise I can also write in words somewhere how the components are related - it's not that hard after you get the bigger picture.

That should give everyone a better sense on how the components are related and what role they play at assembly and runtime. I think then we'll be able to get a better feel of the overall complexity and if and how we should reduce it - including whether to join the modules or not.

I see that container can have single interceptor - perhaps we should consider adding more? For example, adding a possibility to measure execution time and send it via Micrometer, or adding tracing with Sleuth.

Yes, I'm aware of that. TBH, I just wanted to be able to implement the visibility extension feature, so I just added the one 😄 I'll change that to a list of interceptors.

Please add auto-configuration (we do not need @EnableSqs)

Sure. This will come after the BeanPostProcessor change, since that impacts how the feature is bootstrapped.

and tests. I imagine integration tests cover big chunk of this change and I do not mean that we have to have a unit test for each class, but I do believe some pieces here should be unit tested.

Sure, I'll implement UTs and ITs after this is a bit more mature. Same with javadocs and documentation.

tomazfernandes avatar Jun 02 '22 23:06 tomazfernandes

@maciejwalkowiak, I've added simple javadocs to the assembly-time components just to give a jump start on this.

Below is a high-level view of the design, focusing at first at assembly-time components and flows, which I think are harder to understand. Feel free to ask if you (or anyone) have any questions or suggestions.

Afterwards we can dive into the runtime components, and some other aspects of the design.

I hope this makes sense to everyone 😄

Component types

  • Assembly-time components are invoked during application startup to create the runtime components. Those are the components in the config and endpoint packages, such as MessageListenerContainerFactory and Endpoint.

  • Runtime components - basically everything used at runtime, such as method invocation, payload conversion, and the MessageListenerContainer with its many sub-components.

Abstraction levels

In terms of abstraction I think in three levels:

  • Interface level - top level abstractions. As a rule of thumb, should only interact with other interfaces.

  • Abstract level - base classes that implement the interfaces. Should only interact with other abstract-level components and interfaces.

  • Concrete level - lower level abstractions that represent concrete logic for a given messaging system (SQS).

In the messaging-support module we have the first two layers of abstraction, and in the sqs module we have the third.

tomazfernandes avatar Jun 03 '22 03:06 tomazfernandes

Assembly time flow

This is a bit more complicated to illustrate in writing, but hopefully it'll be helpful.

  • SqsEndpointMessageHandler

The SqsEndpointMessageHandler bean detects @SqsListener annotations and extracts the information that will be used to populate the SQSEndpoint instances. It implements the EndpointRegistry interface, which means it'll store the endpoints for later processing.

It also gathers the methods that will be invoked to handle messages at runtime.

This is the part that should be replaced by the BeanPostProcessor approach.

  • EndpointProcessor

Then the EndpointProcessor fetches the EndpointRegistry bean (in our case the message handler above) and feeds the Endpoint instances to the corresponding MessageListenerContainerFactory instance. In this version, the Endpoint has the factoryBeanName field - Spring Kafka handles it differently and more complex.

The factory creates the appropriate MessageListenerContainer instance, which is then registered in the ListenerContainerRegistry.

  • MessageListenerContainerFactory

The MessageListenerContainerFactory bean name comes from the Endpoint instance, which can either have been specified through the @SqsListener annotation or have the default value.

Each factory will have a FactoryOptions instance (user provided or default). The AbstractFactory should only know about the AbstractFactoryOptions and FactoryOptions, so any configuration needed for the AbstractFactory to do its work should be there. Same goes for the AbstractEndpoint.

The factory will combine configuration from the FactoryOptions with configuration from the Endpoint to create a ContainerOptions instance.

Then it creates the MessageListenerContainer instance.

  • MessageListenerContainer

This is the class that will be used at runtime to fetch and process the actual messages. This part is more straightforward I guess. Currently, all queue names provided in the same @SqsListener annotations are handled by the same container, although each will have its own MessageProducer.

Again, the AsbtractMessageListenerContainer should only know about AbstractContainerOptions.

tomazfernandes avatar Jun 03 '22 03:06 tomazfernandes

So the relationship is:

Interface level: MessageListenerContainerFactory (processes) Endpoint (with) FactoryOptions (to create) MessageListenerContainer (with) ContainerOptions

Abstract level: Same but beginning with Abstract 😄

Concrete level: Same but beginning with SQS

Please let me know if all this makes it any easier to understand. I can still try to create the diagrams if that's better.

Thanks!

tomazfernandes avatar Jun 03 '22 03:06 tomazfernandes

Thanks @tomazfernandes this definitely makes things easier. I imagine creating diagrams although useful, can take significant amount of your time so I don't want to push for that. Would you be able to post here a sample code how to create a listener programmatically? This I think would explain even better step by step the assembly line.

maciejwalkowiak avatar Jun 03 '22 04:06 maciejwalkowiak

Thanks @tomazfernandes this definitely makes things easier. I imagine creating diagrams although useful, can take significant amount of your time so I don't want to push for that.

Yeah, given the amount of work to be done if we can get by with this that’s better 😁

Would you be able to post here a sample code how to create a listener programmatically? This I think would explain even better step by step the assembly line.

Not sure I understand what you mean - you mean creating a MessageListenerContainer without using the @SqsListener annotation?

tomazfernandes avatar Jun 03 '22 05:06 tomazfernandes

Yes, creating a listener, start listening for messages, stop listening for messages. I think it is doable without using @SqsListener right?

maciejwalkowiak avatar Jun 03 '22 08:06 maciejwalkowiak

Yes, creating a listener, start listening for messages, stop listening for messages. I think it is doable without using @SqsListener right?

That's a good call. As is, we depend on some information gathered from the QueueAttributes to make the container work, and the logic that retrieves that is currently coupled to the annotation processing in the MessageHandler, so it'd be tricky to do it manually.

As part of moving things to the BeanPostProcessor I'll keep that in mind - either dropping the requirement of having the QueueAttributes in the container, or perhaps gathering that information from AWS on container startup instead.

Also, since the container has many subcomponents, I'll move the defaults to the ContainerOptions so that users can easily change only the options they choose to instead of having to provide all of them.

tomazfernandes avatar Jun 03 '22 14:06 tomazfernandes

Hey @maciejwalkowiak, and everyone! Just passing by to let you know that I’ve been working really hard on this and should have some exciting changes to push sometime next week.

I have, so far:

  • Joined the modules
  • Removed no longer necessary abstractions
  • Thoroughly reviewed and simplified the overall design
  • Reviewed and simplified generics typing
  • Added integration tests with manual factory and container creation
  • Implemented the basis for making it compatible with FIFO queues and ordered message processing
  • Implemented multiple interceptors support
  • Implemented BeanPostProcessing for annotations and flexible method invoking
  • Many other small fixes and improvements

There are still some parts I want to get done before pushing the code, and some of it is still a bit rough around the edges, so I’ll keep working on it a bit further before pushing if that’s ok.

I’m really excited about this and if you liked the previous design I’m sure you’ll like it a lot better with these changes!

Please let me know if there’s something else in the meantime.

Thanks!

tomazfernandes avatar Jun 16 '22 17:06 tomazfernandes

@tomazfernandes thanks for update! I am looking forward for the updates! Since we have just released M1, we can try to aim with SQS support for M2. It does not need to be feature complete yet - but should be enough to get feedback from the community. Thanks again for your efforts!

maciejwalkowiak avatar Jun 16 '22 20:06 maciejwalkowiak

@tomazfernandes thanks for update! I am looking forward for the updates!

Looking forward to presenting it too!

Since we have just released M1, we can try to aim with SQS support for M2. It does not need to be feature complete yet - but should be enough to get feedback from the community.

Sure, sounds good, do you have an approx. date in mind?

Thanks again for your efforts!

No worries, thanks for the opportunity!

tomazfernandes avatar Jun 16 '22 20:06 tomazfernandes

Ideally we will release M2 in July/August. Once CloudWatch, Dynamo, and few other improvements are done we will fire the release. If SQS is there - even better, but if not, it can also go with M3.

maciejwalkowiak avatar Jun 17 '22 07:06 maciejwalkowiak

For a load of 1K messages with a 1s processing time (Thread.sleep), the result was: 10.241481457 seconds for sending and consuming 1000 messages. Messages / second: 97.6421237687742

Running the same test with the current 2.4 version, the result was: 65.50966774 seconds for sending and consuming 1000 messages. Messages / second: 15.26492248394359

Made some performance tweaks to the container logic and this went down (or up 😄) to 14:02:03.234 [main] INFO i.a.cloud.sqs.SqsIntegrationTests - 7.44940653 seconds for sending and consuming 1000 messages. Messages / second: 134.23888144281474

Of course, the actual figures depend on a lot of factors, and perhaps we can arrange a proper load test sometime in the future with an app running on AWS, but I think it's fair to say it's a good improvement so far!

tomazfernandes avatar Jun 17 '22 17:06 tomazfernandes

It's great and definitely good enough to not get stuck on the performance aspect :-)

maciejwalkowiak avatar Jun 20 '22 07:06 maciejwalkowiak

It's great and definitely good enough to not get stuck on the performance aspect :-)

Sure @maciejwalkowiak, thanks. I may have given the wrong impression on twitter - I spent only a few hours on it while reviewing some of the container logic, so IMO definitely worth the performance improvements ☺️

I'm wrapping up a few things and should push the changes later today or tomorrow, and if everything looks ok we can look into the next steps.

After that, I'm thinking of opening a few issues with what I have in mind for backlog, so we can all take a look and discuss possible solutions and priorities, how does that sound?

Thanks!

tomazfernandes avatar Jun 20 '22 19:06 tomazfernandes

Hi, I've pushed the code as promised. Yay! I'm really confident about this design, and looking forward to your review / feedback.

The integration test suite should do a good job showing the features in action, and I've added all class-level and method-level javadocs. Let me know if you have any questions, and also I can probably create a few diagrams now that we have a lot less classes and packages.

There are still quite a few of shortcomings - no unit tests, FIFO, or autoconfiguration for example. I should open a few issues tomorrow with what I have in mind if that's ok, so we can better align priorities and solutions. And of course feel free to open other issues too.

This is what I consider to be a first real version of the core design, so reviews are welcome @eddumelendez @MatejNedic, if you have the time.

Thanks a lot again for the opportunity! And please let me know if there's anything else.

tomazfernandes avatar Jun 22 '22 02:06 tomazfernandes

A few interesting performance stats.

  • Load test with 1K messages and 1 second Thread.sleep to simulate processing load.

For a load of 1K messages with a 1s processing time (Thread.sleep), the result was: 10.241481457 seconds for sending and consuming 1000 messages. Messages / second: 97.6421237687742

Running the same test with the current 2.4 version, the result was: 65.50966774 seconds for sending and consuming 1000 messages. Messages / second: 15.26492248394359

Made some performance tweaks to the container logic and this went down (or up 😄) to 14:02:03.234 [main] INFO i.a.cloud.sqs.SqsIntegrationTests - 7.44940653 seconds for sending and consuming 1000 messages. Messages / second: 134.23888144281474

After a few further adjustments, this went down to:

22:40:25.899 [main] INFO i.a.cloud.sqs.SqsIntegrationTests - 5.785801345 seconds for sending and consuming 1000 messages. Messages / second: 172.8369054468461

That's more than 10x the current 2.4 version! 😄 I'm confident we can bring this further down in the future by at least a couple seconds by batching acknowledgements.

Again, when we're closer to release maybe it's worth doing some proper load tests with an app on AWS to get some proper metrics, but looks like we're off to a good start!

  • Improved startup time

By starting the message containers in parallel (configurable), the application startup time went down from 7.372 seconds to 3.259 seconds for 9 containers in the test suite (I've removed a couple of them in the final cleanup).

  • Improved test suite performance

By running the tests in parallel, along with the previous changes, the suite execution time went down from 23 seconds to 9 seconds.

Most of these improvements were a result of reviewing the container design for this version and making sure everything worked as expected, so it didn't really take up much time on its own 😉

tomazfernandes avatar Jun 22 '22 03:06 tomazfernandes

Another quick note - I realize there are a lot of commits, I can squash them if you prefer. But I think a single commit would be huge to review anyway, so I thought it'd be better to review from the 'files changed' pane, and this way you can see the incremental changes if you prefer.

Thanks

tomazfernandes avatar Jun 22 '22 04:06 tomazfernandes

I've added a few issues - there are a couple more but I think that should be enough to start with. Of course, feel free to open new ones if you'd like.

I'd appreciate any input from the team, and please let me know if you have any questions or suggestions. Also, if anyone from the team feels like getting their hands dirty, contributions are welcome ☺️

Either way I'll keep working on these. Please let me know if there's anything else I can help with.

Thanks!

tomazfernandes avatar Jun 22 '22 16:06 tomazfernandes