flink-connector-aws
flink-connector-aws copied to clipboard
[FLINK-35305](https://issues.apache.org/jira/browse/FLINK-35305)Amazon SQS Sink Connector
Purpose of the change
Implements the Amazon SQS Sink Connector
Verifying this change
This change added tests and can be verified as follows:
Added unit tests Added integration tests Manually verified by running the SQS connector on a local Flink cluster.
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
- [ ] Dependencies have been added or upgraded
- [ ] Public API has been changed (Public API is any class annotated with
@Public(Evolving)) - [ ] Serializers have been changed
- [x] New feature has been introduced
- If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
Thanks for the efforts. Could we remove the ticket link from the PR title, it should be automatically linked using "Autolink"
We need to add documentation for the new feature as well
- Updated
- Sure, will create a documentation on this
Thanks for the feedback @vahmed-hamdy. I replied to all the feedback except the documentation one which is still in progress. Is there a package where I suppose to add the documentation for the same?
@19priyadhingra 👋 I've had the opportunity to build and try this connector. Want to share some feedback:
- Currently, it looks like all messages are encoded with base64. It'd be great to be able to not use it, e.g. by setting a custom
ElementConverter. - There are lot of
spotlessviolations, you probably want to runmvn spotless:apply.
@19priyadhingra 👋 I've had the opportunity to build and try this connector. Want to share some feedback:
- Currently, it looks like all messages are encoded with base64. It'd be great to be able to not use it, e.g. by setting a custom
ElementConverter.- There are lot of
spotlessviolations, you probably want to runmvn spotless:apply.
@sam1ens, thanks a lot for the feedback.
-
Base64 encoding was the one we were using in our production environment and intention for using it was that base64-encoding guarantees that no invalid characters are present in the message, but now I understand that others might not need it. Would you recommend keeping it normal, String?
-
I have a follow up query on spotless violations. How did you run that. Whenever I am trying to do that, it shows build succeeded with no error and I can see "spotless skipped" in build logs[Attached screenshot]. Is there some setting in code which i need to do to enable spotless working for me?
Sorry for the basic question, I am working on this package for the first time :)
Would you recommend keeping it normal, String?
Yes! And if someone needs base64 encoding they can encode it in the SerializationSchema.
I have a follow up query on spotless violations. How did you run that. Whenever I am trying to do that, it shows build succeeded with no error and I can see "spotless skipped" in build logs[Attached screenshot]. Is there some setting in code which i need to do to enable spotless working for me?
Hmm, I ran mvn clean package -DskipTests in the root of the project and ended up seeing a bunch of spotless violations. Running mvn spotless:apply changed a lot of files in this PR.
Thanks @19priyadhingra for addressing the comments, could we fix the spotless violations as mentioned above?
Thanks @19priyadhingra for addressing the comments, could we fix the spotless violations as mentioned above?
Yes @vahmed-hamdy , I am trying hard on it. As attached in above screenshots, weirdly, all of the spotless checks are getting skipped for me. For me "mvn clean package -DskipTests" getting succeeded with no spotless warnings. Not sure what mvn/Intelij setting I need to enable in my local build to see all these errors. Trying to take others help on this.
Thanks!
Thanks @19priyadhingra for addressing the comments, could we fix the spotless violations as mentioned above?
I was able to finally make "spotless" work for my local workspace post downgrading my JDK version from 21 to 8. Is it like the pre-requisite for this package to use? Should we update that in readMe package. This will help others to save time in debugging such issues later
Thanks for the efforts. Could we remove the ticket link from the PR title, it should be automatically linked using "Autolink"
We need to add documentation for the new feature as well
Added Documentation commit in this PR
Would you recommend keeping it normal, String?
Yes! And if someone needs base64 encoding they can encode it in the
SerializationSchema.I have a follow up query on spotless violations. How did you run that. Whenever I am trying to do that, it shows build succeeded with no error and I can see "spotless skipped" in build logs[Attached screenshot]. Is there some setting in code which i need to do to enable spotless working for me?
Hmm, I ran
mvn clean package -DskipTestsin the root of the project and ended up seeing a bunch of spotless violations. Runningmvn spotless:applychanged a lot of files in this PR.
- Removed Base64 encoding and fixed spotless issues
We seem to be having quite a few . in the class folders. Can we change them to / instead?
e.g. `flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java
We seem to be having quite a few
.in the class folders. Can we change them to/instead? e.g. `flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java
Good catch! Fixed
It'd also be great to mention what permissions are needed for this connector to work. E.g., is sqs:SendMessage sufficient?
It'd also be great to mention what permissions are needed for this connector to work. E.g., is
sqs:SendMessagesufficient?
Good point!, Yes, sqs:SendMessage is sufficient, updated the same in the documentation.
Hi, when is this planned to be released? @19priyadhingra
Hi, when is this planned to be released? @19priyadhingra
Unfortunately, right now It is stuck on getting approval, no actions pending at my end at this stage.
Waiting patiently for this to be approved.
spotless seems to be failing.
spotless seems to be failing.
oops sorry!! Fixed now
@19priyadhingra The tests seems to be failing. Can we please take a look?
Also - it would be good if we squash the commits!
@19priyadhingra The tests seems to be failing. Can we please take a look?
Also - it would be good if we squash the commits!
-
Hi Hong, I tried to deep dive more on the only failed test: elementConverterWillOpenSerializationSchema failed logs: https://paste.amazon.com/show/dhipriya/1721770192 it complains TestSinkInitContext cant be cast to sink2.Sink$InitContext. I understood the reason why it is failing but not sure about how to fix it. This issue started coming post 1.19.1 where we made change in TestSinkInitContext. One of the easy way could be to remove this test if it is not adding any big value or other fix might require change in flink-connector-base where we have to keep the old TestSinkInitContext? Right now I removed that test, please let me know your thought on this. https://github.com/apache/flink/blob/release-1.19.1/flink-connectors/flink-connect[…]pache/flink/connector/base/sink/writer/TestSinkInitContext.java -> new TestSinkInitContext implements WriterInitContext whereas old TestSinkInitContext ( https://github.com/apache/flink/blob/release-1.18.1/flink-connectors/flink-connect[…]pache/flink/connector/base/sink/writer/TestSinkInitContext.java) implements Sink.InitContext Now new one which implements WriterInitContext extends org.apache.flink.api.connector.sink2.InitContext, not the required one org.apache.flink.api.connector.sink2.Sink.InitContext
-
Squashed the commits
Integ test are failing because local stack is unable to create sqs, this issue seems like the one mentioned here: https://github.com/aws/aws-sdk-java/issues/3055
But weirdly, the same test case was passed earlier, anybody aware of any recent changes made in this repo related to local-stack that can cause this?
cc: @hlteoh37
@19priyadhingra This may potentially be related to recent AWS SDK upgrade in #149
#149
Thanks for the info @z3d1k , reverting that change in my local workspace actually fixed the test case, Do you have any suggestion on how can I fix my test case without reverting that PR?
Do you have any suggestion on how can I fix my test case without reverting that PR?
Hi @19priyadhingra, we can either debug the local stack issue, or consider reducing the AWS version for our ITCase against localstack - we need to track the fix with a JIRA though
consider reducing the AWS version for our ITCase against localstack
@hlteoh37 , I am sorry, I didn't understand this part, do you mean replacing 1 with 2? I tried this but build started failing because of conflicting aws-sdk version exists at the same time
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
2
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<version>2.20.144</version>
</dependency>
Also there is no aws-sdk direct dependency I can see in e2e test package which I can downgrade, its taking aws-sdk dependency from flink-connnector-sqs package
cc: @z3d1k
Do you have any suggestion on how can I fix my test case without reverting that PR?
Hi @19priyadhingra, we can either debug the local stack issue, or consider reducing the AWS version for our ITCase against localstack - we need to track the fix with a JIRA though
Added following in sqs-e2e-sqs pom file
<dependencies>
<!-- Overridden aws-sdk dependency to older version to temporarily fix 'not able to create sqs localstack error with newer version'-->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.20.144</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
@hlteoh37 All workflows passed in last trigger. is there any other blocker in merging this? Thanks!
yes this PR will be super helpful for us as well 👀