flink-connector-rabbitmq
flink-connector-rabbitmq copied to clipboard
[FLINK-20628] RabbitMQ Connector using FLIP-27 Source API
What is the purpose of the change
This pull request ports the RabbitMQ connector implementation to the new Connector’s API described in FLIP-27 and FLIP-143. It includes both source and sink with at-most-once, at-least-once, and exactly-once behavior, respectively.
This pull request closes the following issues (separated RabbitMQ connector Source and Sink tickets): FLINK-20628 and FLINK-21373
Brief change log
- Source and Sink use the RabbitMQ’s Java Client API to interact with RabbitMQ
- The RabbitMQ Source reads messages from a queue
- At-least-once
- Messages are acknowledged on checkpoint completion
- Exactly-once
- Messages are acknowledged in a transaction
- The user has to set correlation ids for deduplication
- The RabbitMQ Sink publishes messages to a queue
- At-least-once
- Unacknowledged messages are resend on checkpoints
- Exactly-once
- Messages between two checkpoints are published in a transaction
Verifying this change
This change added tests and can be verified as follows:
All changes are within the flink-connectors/flink-connector-rabbitmq2/ module. Added Integration Tests can be find under org.apache.flink.connector.rabbitmq2.source and org.apache.flink.connector.rabbitmq2.sink package in the test respective directories.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes)
- The public API, i.e., is any changed class annotated with
@Public(Evolving)
: (no) - The serializers: (don't know)
- The runtime per-record code paths (performance sensitive): (don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (don't know)
- The S3 file system connector: (no)
Documentation
- Does this pull request introduces a new feature? (yes)
- If yes, how is the feature documented? (JavaDocs)
Co-authored-by: Yannik Schroeder [email protected] Co-authored-by: Jan Westphal [email protected]
This is a copy from the original PR (https://github.com/apache/flink/pull/15140) against the Flink repository.
@MartijnVisser We are not exactly sure what has to be part of the root-pom.
@pscls I think you've done a good job already with the root-pom; it looks like the one we currently have for Elasticsearch. I've just approved the run, so we can also see how the build behaves. When I tried it locally, it complained about https://github.com/pscls/flink-connector-rabbitmq/blob/new-api-connector/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java#L124 having a whitespace, but now the tests are running for me.
I'll work on finding someone who can help with the review for this.
@pscls Can you have a look at the failing build? It's a checkstyle error.
@MartijnVisser @pscls I noticed that the GitBox of flink-connector-rabbitmq sent emails to the [email protected]. Is it expected?
@wanglijie95 No. Most likely this is caused because the PR was created/is not yet using the ASF config as defined in https://github.com/apache/flink-connector-rabbitmq/blob/main/.asf.yaml
@pscls The CI fails due to spotless; can you fix that? (By running mvn spotless:apply
)
@pscls The CI fails due to spotless; can you fix that? (By running
mvn spotless:apply
)
@MartijnVisser I've nothing to commit when running mvn spotless:apply
.
@pscls Weird. Could you push once more, since the logs are no longer available?
You can avoid a lot of boilerplate by using the preliminary flink-connector-parent pom as shown here: https://github.com/apache/flink-connector-elasticsearch/commit/6e30d5d63d395b2f731418c34f5838231dcab6b8
Hi, @pscls Thank you very much for the contribution. I notice that this PR has not been updated for a long time. Would you like to continue advancing it ? After the PR completed, FLINK-25380 will be introduced. Looking forward to your opinion. Thanks.
Would you like to continue advancing it ?
@RocMarshal Do you want to take this over?
Would you like to continue advancing it ?
@RocMarshal Do you want to take this over?
Hi, @MartijnVisser Glad to get your attention. In fact, you have already assigned this ticket to me on https://issues.apache.org/jira/browse/FLINK-20628 . I am working for it now. Thank you ~
I am working for it now.
Any update to report from your end on this @RocMarshal ?
I am working for it now.
Any update to report from your end on this @RocMarshal ?
Hi, @MartijnVisser Still in doing. I'll update in the end of this week.