flink-connector-aws icon indicating copy to clipboard operation
flink-connector-aws copied to clipboard

[FLINK-35305](https://issues.apache.org/jira/browse/FLINK-35305)Amazon SQS Sink Connector

Open 19priyadhingra opened this issue 1 year ago • 1 comments

Purpose of the change

Implements the Amazon SQS Sink Connector

Verifying this change

This change added tests and can be verified as follows:

Added unit tests Added integration tests Manually verified by running the SQS connector on a local Flink cluster.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • [ ] Dependencies have been added or upgraded
  • [ ] Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • [ ] Serializers have been changed
  • [x] New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

19priyadhingra avatar May 10 '24 22:05 19priyadhingra

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

boring-cyborg[bot] avatar May 10 '24 22:05 boring-cyborg[bot]

Thanks for the efforts. Could we remove the ticket link from the PR title, it should be automatically linked using "Autolink"

We need to add documentation for the new feature as well

  1. Updated
  2. Sure, will create a documentation on this

19priyadhingra avatar May 21 '24 21:05 19priyadhingra

Thanks for the feedback @vahmed-hamdy. I replied to all the feedback except the documentation one which is still in progress. Is there a package where I suppose to add the documentation for the same?

19priyadhingra avatar May 28 '24 20:05 19priyadhingra

@19priyadhingra 👋 I've had the opportunity to build and try this connector. Want to share some feedback:

  • Currently, it looks like all messages are encoded with base64. It'd be great to be able to not use it, e.g. by setting a custom ElementConverter.
  • There are lot of spotless violations, you probably want to run mvn spotless:apply.

sap1ens avatar May 29 '24 18:05 sap1ens

@19priyadhingra 👋 I've had the opportunity to build and try this connector. Want to share some feedback:

  • Currently, it looks like all messages are encoded with base64. It'd be great to be able to not use it, e.g. by setting a custom ElementConverter.
  • There are lot of spotless violations, you probably want to run mvn spotless:apply.

@sam1ens, thanks a lot for the feedback.

  1. Base64 encoding was the one we were using in our production environment and intention for using it was that base64-encoding guarantees that no invalid characters are present in the message, but now I understand that others might not need it. Would you recommend keeping it normal, String?

  2. I have a follow up query on spotless violations. How did you run that. Whenever I am trying to do that, it shows build succeeded with no error and I can see "spotless skipped" in build logs[Attached screenshot]. Is there some setting in code which i need to do to enable spotless working for me? image Sorry for the basic question, I am working on this package for the first time :)

19priyadhingra avatar May 29 '24 19:05 19priyadhingra

Would you recommend keeping it normal, String?

Yes! And if someone needs base64 encoding they can encode it in the SerializationSchema.

I have a follow up query on spotless violations. How did you run that. Whenever I am trying to do that, it shows build succeeded with no error and I can see "spotless skipped" in build logs[Attached screenshot]. Is there some setting in code which i need to do to enable spotless working for me?

Hmm, I ran mvn clean package -DskipTests in the root of the project and ended up seeing a bunch of spotless violations. Running mvn spotless:apply changed a lot of files in this PR.

sap1ens avatar May 29 '24 20:05 sap1ens

Thanks @19priyadhingra for addressing the comments, could we fix the spotless violations as mentioned above?

vahmed-hamdy avatar May 31 '24 15:05 vahmed-hamdy

Thanks @19priyadhingra for addressing the comments, could we fix the spotless violations as mentioned above?

Yes @vahmed-hamdy , I am trying hard on it. As attached in above screenshots, weirdly, all of the spotless checks are getting skipped for me. For me "mvn clean package -DskipTests" getting succeeded with no spotless warnings. Not sure what mvn/Intelij setting I need to enable in my local build to see all these errors. Trying to take others help on this.

Thanks!

19priyadhingra avatar May 31 '24 18:05 19priyadhingra

Thanks @19priyadhingra for addressing the comments, could we fix the spotless violations as mentioned above?

I was able to finally make "spotless" work for my local workspace post downgrading my JDK version from 21 to 8. Is it like the pre-requisite for this package to use? Should we update that in readMe package. This will help others to save time in debugging such issues later

19priyadhingra avatar Jun 03 '24 22:06 19priyadhingra

Thanks for the efforts. Could we remove the ticket link from the PR title, it should be automatically linked using "Autolink"

We need to add documentation for the new feature as well

Added Documentation commit in this PR

19priyadhingra avatar Jun 04 '24 02:06 19priyadhingra

Would you recommend keeping it normal, String?

Yes! And if someone needs base64 encoding they can encode it in the SerializationSchema.

I have a follow up query on spotless violations. How did you run that. Whenever I am trying to do that, it shows build succeeded with no error and I can see "spotless skipped" in build logs[Attached screenshot]. Is there some setting in code which i need to do to enable spotless working for me?

Hmm, I ran mvn clean package -DskipTests in the root of the project and ended up seeing a bunch of spotless violations. Running mvn spotless:apply changed a lot of files in this PR.

  1. Removed Base64 encoding and fixed spotless issues

19priyadhingra avatar Jun 04 '24 02:06 19priyadhingra

We seem to be having quite a few . in the class folders. Can we change them to / instead? e.g. `flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java

hlteoh37 avatar Jun 04 '24 09:06 hlteoh37

We seem to be having quite a few . in the class folders. Can we change them to / instead? e.g. `flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java

Good catch! Fixed

19priyadhingra avatar Jun 05 '24 06:06 19priyadhingra

It'd also be great to mention what permissions are needed for this connector to work. E.g., is sqs:SendMessage sufficient?

sap1ens avatar Jun 06 '24 18:06 sap1ens

It'd also be great to mention what permissions are needed for this connector to work. E.g., is sqs:SendMessage sufficient?

Good point!, Yes, sqs:SendMessage is sufficient, updated the same in the documentation.

19priyadhingra avatar Jun 09 '24 06:06 19priyadhingra

Hi, when is this planned to be released? @19priyadhingra

nehahmoe avatar Jun 26 '24 11:06 nehahmoe

Hi, when is this planned to be released? @19priyadhingra

Unfortunately, right now It is stuck on getting approval, no actions pending at my end at this stage.

19priyadhingra avatar Jun 26 '24 17:06 19priyadhingra

Waiting patiently for this to be approved.

duncandee avatar Jun 27 '24 07:06 duncandee

spotless seems to be failing.

hlteoh37 avatar Jul 12 '24 08:07 hlteoh37

spotless seems to be failing.

oops sorry!! Fixed now

19priyadhingra avatar Jul 12 '24 16:07 19priyadhingra

@19priyadhingra The tests seems to be failing. Can we please take a look?

Also - it would be good if we squash the commits!

hlteoh37 avatar Jul 23 '24 12:07 hlteoh37

@19priyadhingra The tests seems to be failing. Can we please take a look?

Also - it would be good if we squash the commits!

  1. Hi Hong, I tried to deep dive more on the only failed test: elementConverterWillOpenSerializationSchema failed logs: https://paste.amazon.com/show/dhipriya/1721770192 it complains TestSinkInitContext cant be cast to sink2.Sink$InitContext. I understood the reason why it is failing but not sure about how to fix it. This issue started coming post 1.19.1 where we made change in TestSinkInitContext. One of the easy way could be to remove this test if it is not adding any big value or other fix might require change in flink-connector-base where we have to keep the old TestSinkInitContext? Right now I removed that test, please let me know your thought on this. https://github.com/apache/flink/blob/release-1.19.1/flink-connectors/flink-connect[…]pache/flink/connector/base/sink/writer/TestSinkInitContext.java -> new TestSinkInitContext implements WriterInitContext whereas old TestSinkInitContext ( https://github.com/apache/flink/blob/release-1.18.1/flink-connectors/flink-connect[…]pache/flink/connector/base/sink/writer/TestSinkInitContext.java) implements Sink.InitContext Now new one which implements WriterInitContext extends org.apache.flink.api.connector.sink2.InitContext, not the required one org.apache.flink.api.connector.sink2.Sink.InitContext

  2. Squashed the commits

19priyadhingra avatar Jul 25 '24 18:07 19priyadhingra

Integ test are failing because local stack is unable to create sqs, this issue seems like the one mentioned here: https://github.com/aws/aws-sdk-java/issues/3055

But weirdly, the same test case was passed earlier, anybody aware of any recent changes made in this repo related to local-stack that can cause this?

cc: @hlteoh37

19priyadhingra avatar Jul 31 '24 05:07 19priyadhingra

@19priyadhingra This may potentially be related to recent AWS SDK upgrade in #149

z3d1k avatar Jul 31 '24 12:07 z3d1k

#149

Thanks for the info @z3d1k , reverting that change in my local workspace actually fixed the test case, Do you have any suggestion on how can I fix my test case without reverting that PR?

19priyadhingra avatar Aug 01 '24 05:08 19priyadhingra

Do you have any suggestion on how can I fix my test case without reverting that PR?

Hi @19priyadhingra, we can either debug the local stack issue, or consider reducing the AWS version for our ITCase against localstack - we need to track the fix with a JIRA though

hlteoh37 avatar Aug 01 '24 09:08 hlteoh37

consider reducing the AWS version for our ITCase against localstack

@hlteoh37 , I am sorry, I didn't understand this part, do you mean replacing 1 with 2? I tried this but build started failing because of conflicting aws-sdk version exists at the same time

<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>sqs</artifactId>
</dependency>

2

<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>sqs</artifactId>
    <version>2.20.144</version>
</dependency>

Also there is no aws-sdk direct dependency I can see in e2e test package which I can downgrade, its taking aws-sdk dependency from flink-connnector-sqs package

cc: @z3d1k

19priyadhingra avatar Aug 02 '24 04:08 19priyadhingra

Do you have any suggestion on how can I fix my test case without reverting that PR?

Hi @19priyadhingra, we can either debug the local stack issue, or consider reducing the AWS version for our ITCase against localstack - we need to track the fix with a JIRA though

Added following in sqs-e2e-sqs pom file

    <dependencies>

            <!-- Overridden aws-sdk dependency to older version to temporarily fix 'not able to create sqs localstack error with newer version'-->
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>bom</artifactId>
                <version>2.20.144</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

19priyadhingra avatar Aug 02 '24 15:08 19priyadhingra

@hlteoh37 All workflows passed in last trigger. is there any other blocker in merging this? Thanks!

19priyadhingra avatar Aug 07 '24 17:08 19priyadhingra

yes this PR will be super helpful for us as well 👀

VittoriDavide avatar Aug 08 '24 17:08 VittoriDavide