[FLINK-28595][Connectors/kafka] KafkaSource should not read metadata of unmatched regex topics
What is the purpose of the change
KafkaSource should not read metadata of unmatched regex topics, Optimize the logic for filtering topics
Brief change log
The filter logic from TopicPatternSubscriber# getSubscribedTopicPartitions method moves to KafkaSubscriberUtils# getAllTopicMetadata method
Verifying this change
This change is already covered by existing tests, such as KafkaSubscriberTest#testTopicPatternSubscriber.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (no) - The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
- The S3 file system connector: (no)
Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not documented)
CI report:
- a82be8d684f6366f58c0dca227535d5a038d8cd6 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
We should support the case where no Pattern exists and a caller needs the entire list of topics with metadata.
We should support the case where no Pattern exists and a caller needs the entire list of topics with metadata.
hi @amazhar1 ,Thanks for your comment, when Pattern is not exists, user should use setTopics method, the TopicListSubscriber object will be called at the bottom
I was thinking that KafkaSubscriberUtils is a general purpose utility class that any other code can call. While today it is only invoked by the Regex subscriber, tomorrow something else might need a list of topics, with or without a regex filter.
I was thinking that KafkaSubscriberUtils is a general purpose utility class that any other code can call. While today it is only invoked by the Regex subscriber, tomorrow something else might need a list of topics, with or without a regex filter.
yeah, u are right, maybe we can move getAllTopicMetadata method to TopicPatternSubscriber object
I like it.
One alternative is to move this function back into KafkaSubscriberUtils so that both the unfiltered and this new filtered method is available for all (future) callers.
That makes sense. It would be friendlier. I've changed it that way
Thank you very much for doing this Jason. Do you think you could add a unit test for this method to make it easier for the assigned reviewer to review/merge things? I have not looked but there must be a mock for the adminclient.
This change is already covered by existing tests, you can refer to KafkaSubscriberTest#testTopicPatternSubscriber.
@flinkbot run azure
@flinkbot run azure
@PatrickRen hi, please help to review this PR, When you have free time.thanks
@JasonLeeCoding The Flink Kafka connector resides in it's own repository, if this code change is still relevant, please open the PR in https://github.com/apache/flink-connector-kafka