flink-connector-pulsar icon indicating copy to clipboard operation
flink-connector-pulsar copied to clipboard

[FLINK-37436] Fix that flink-connector-pulsar used a incorrect API when dynamically creating topics by DynamicTopicRouter

Open poorbarcode opened this issue 8 months ago • 1 comments

Purpose of the change

Background 1: dynamically create a Pulsar Topic by Flink connector-pulsar

Flink connector-pulsar provided a way to dynamically create a Pulsar Topic when DynamicTopicRouter returns a non-existing one. see also: flink-connector-pulsar/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java at main · apache/flink-connector-pulsar

  • pulsarClient.getPartitionsForTopic(topic) will create a topic automatically if it does not exist.

Background 2:  how dynamically created topics in Pulsar Server

  • There is a config named allowAutoTopicCreationType, which can be set to partitioned or non-partitioned
  • If it was set partitioned, Pulsar will create a partitioned topic with {defaultNumPartitions} partitions. For example, Pulsar will create topics named {tenant}/{namespace}/{topic name}-partition-0 and {tenant}/{namespace}/{topic name}-partition-1, and create a relationship between them, which indicates they are in a same partitioned topic.
  • If it was set non-partitioned, Pulsar will create a non-partitioned topic. Pulsar will create topics named {tenant}/{namespace}/{topic name}, which does not include a suffix partition-{num}.

Issue:

  • if pulsarClient.getPartitionsForTopic(topic) get a param {tenant}/{namespace}/{topic name}-partition-0, which includes the suffix partition-0, Pulsar will create a non-partitioned topic named {tenant}/{namespace}/{topic name}-partition-0
  • After you call pulsarClient.getPartitionsForTopic(topic) with a param {tenant}/{namespace}/{topic name}-partition-1, you will get two partitions named {tenant}/{namespace}/{topic name}-partition-0 and {tenant}/{namespace}/{topic name}-partition-1, but there is no relationship record between them.

Relates to https://issues.apache.org/jira/projects/FLINK/issues/FLINK-37436

Brief change log

Fix the incorrect API calling.

Verifying this change

This change is a minor change and don't have any tests.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • [ ] Dependencies have been added or upgraded
  • [ ] Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • [ ] Serializers have been changed
  • [ ] New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

poorbarcode avatar Mar 07 '25 12:03 poorbarcode

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

boring-cyborg[bot] avatar Mar 07 '25 12:03 boring-cyborg[bot]