flink-connector-rabbitmq icon indicating copy to clipboard operation
flink-connector-rabbitmq copied to clipboard

[FLINK-20628] RabbitMQ Connector using FLIP-27 Source API

Open pscls opened this issue 2 years ago • 15 comments

What is the purpose of the change

This pull request ports the RabbitMQ connector implementation to the new Connector’s API described in FLIP-27 and FLIP-143. It includes both source and sink with at-most-once, at-least-once, and exactly-once behavior, respectively.

This pull request closes the following issues (separated RabbitMQ connector Source and Sink tickets): FLINK-20628 and FLINK-21373

Brief change log

  • Source and Sink use the RabbitMQ’s Java Client API to interact with RabbitMQ
  • The RabbitMQ Source reads messages from a queue
  • At-least-once
    • Messages are acknowledged on checkpoint completion
  • Exactly-once
    • Messages are acknowledged in a transaction
  • The user has to set correlation ids for deduplication
  • The RabbitMQ Sink publishes messages to a queue
  • At-least-once
    • Unacknowledged messages are resend on checkpoints
  • Exactly-once
    • Messages between two checkpoints are published in a transaction

Verifying this change

This change added tests and can be verified as follows:

All changes are within the flink-connectors/flink-connector-rabbitmq2/ module. Added Integration Tests can be find under org.apache.flink.connector.rabbitmq2.source and org.apache.flink.connector.rabbitmq2.sink package in the test respective directories.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (don't know)
  • The runtime per-record code paths (performance sensitive): (don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (don't know)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduces a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs)

Co-authored-by: Yannik Schroeder [email protected] Co-authored-by: Jan Westphal [email protected]

pscls avatar May 23 '22 18:05 pscls

This is a copy from the original PR (https://github.com/apache/flink/pull/15140) against the Flink repository.

pscls avatar May 23 '22 18:05 pscls

@MartijnVisser We are not exactly sure what has to be part of the root-pom.

pscls avatar May 23 '22 18:05 pscls

@pscls I think you've done a good job already with the root-pom; it looks like the one we currently have for Elasticsearch. I've just approved the run, so we can also see how the build behaves. When I tried it locally, it complained about https://github.com/pscls/flink-connector-rabbitmq/blob/new-api-connector/flink-connector-rabbitmq/src/main/java/org/apache/flink/connector/rabbitmq/sink/RabbitMQSink.java#L124 having a whitespace, but now the tests are running for me.

I'll work on finding someone who can help with the review for this.

MartijnVisser avatar May 24 '22 12:05 MartijnVisser

@pscls Can you have a look at the failing build? It's a checkstyle error.

MartijnVisser avatar Jun 20 '22 07:06 MartijnVisser

@MartijnVisser @pscls I noticed that the GitBox of flink-connector-rabbitmq sent emails to the [email protected]. Is it expected?

wanglijie95 avatar Jun 20 '22 07:06 wanglijie95

@wanglijie95 No. Most likely this is caused because the PR was created/is not yet using the ASF config as defined in https://github.com/apache/flink-connector-rabbitmq/blob/main/.asf.yaml

MartijnVisser avatar Jun 20 '22 07:06 MartijnVisser

@pscls The CI fails due to spotless; can you fix that? (By running mvn spotless:apply)

MartijnVisser avatar Aug 03 '22 13:08 MartijnVisser

@pscls The CI fails due to spotless; can you fix that? (By running mvn spotless:apply)

@MartijnVisser I've nothing to commit when running mvn spotless:apply. image

pscls avatar Aug 09 '22 08:08 pscls

@pscls Weird. Could you push once more, since the logs are no longer available?

MartijnVisser avatar Aug 31 '22 11:08 MartijnVisser

You can avoid a lot of boilerplate by using the preliminary flink-connector-parent pom as shown here: https://github.com/apache/flink-connector-elasticsearch/commit/6e30d5d63d395b2f731418c34f5838231dcab6b8

zentol avatar Sep 30 '22 09:09 zentol

Hi, @pscls Thank you very much for the contribution. I notice that this PR has not been updated for a long time. Would you like to continue advancing it ? After the PR completed, FLINK-25380 will be introduced. Looking forward to your opinion. Thanks.

RocMarshal avatar Jun 12 '23 10:06 RocMarshal

Would you like to continue advancing it ?

@RocMarshal Do you want to take this over?

MartijnVisser avatar Jun 29 '23 14:06 MartijnVisser

Would you like to continue advancing it ?

@RocMarshal Do you want to take this over?

Hi, @MartijnVisser Glad to get your attention. In fact, you have already assigned this ticket to me on https://issues.apache.org/jira/browse/FLINK-20628 . I am working for it now. Thank you ~

RocMarshal avatar Jun 29 '23 15:06 RocMarshal

I am working for it now.

Any update to report from your end on this @RocMarshal ?

MartijnVisser avatar Jul 28 '23 11:07 MartijnVisser

I am working for it now.

Any update to report from your end on this @RocMarshal ?

Hi, @MartijnVisser Still in doing. I'll update in the end of this week.

RocMarshal avatar Jul 29 '23 04:07 RocMarshal