flink-connector-pulsar
flink-connector-pulsar copied to clipboard
[FLINK-37436] Fix that flink-connector-pulsar used a incorrect API when dynamically creating topics by DynamicTopicRouter
Purpose of the change
Background 1: dynamically create a Pulsar Topic by Flink connector-pulsar
Flink connector-pulsar provided a way to dynamically create a Pulsar Topic when DynamicTopicRouter returns a non-existing one. see also: flink-connector-pulsar/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java at main · apache/flink-connector-pulsar.
pulsarClient.getPartitionsForTopic(topic)will create a topic automatically if it does not exist.
Background 2: how dynamically created topics in Pulsar Server
- There is a config named
allowAutoTopicCreationType, which can be set topartitionedornon-partitioned - If it was set
partitioned, Pulsar will create a partitioned topic with{defaultNumPartitions}partitions. For example, Pulsar will create topics named{tenant}/{namespace}/{topic name}-partition-0and{tenant}/{namespace}/{topic name}-partition-1, and create a relationship between them, which indicates they are in a same partitioned topic. - If it was set
non-partitioned, Pulsar will create a non-partitioned topic. Pulsar will create topics named{tenant}/{namespace}/{topic name}, which does not include a suffixpartition-{num}.
Issue:
- if
pulsarClient.getPartitionsForTopic(topic)get a param{tenant}/{namespace}/{topic name}-partition-0, which includes the suffixpartition-0, Pulsar will create a non-partitioned topic named{tenant}/{namespace}/{topic name}-partition-0 - After you call
pulsarClient.getPartitionsForTopic(topic)with a param{tenant}/{namespace}/{topic name}-partition-1, you will get two partitions named{tenant}/{namespace}/{topic name}-partition-0and{tenant}/{namespace}/{topic name}-partition-1, but there is no relationship record between them.
Relates to https://issues.apache.org/jira/projects/FLINK/issues/FLINK-37436
Brief change log
Fix the incorrect API calling.
Verifying this change
This change is a minor change and don't have any tests.
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
- [ ] Dependencies have been added or upgraded
- [ ] Public API has been changed (Public API is any class annotated with
@Public(Evolving)) - [ ] Serializers have been changed
- [ ] New feature has been introduced
- If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)