flink-connector-pulsar
flink-connector-pulsar copied to clipboard
[FLINK-37299][Connector/Pulsar] Flink stateless startup cannot contin…
Purpose of the change
Controls whether Flink continues to consume from the location recorded by the consumer group or from the location specified by StartCursor when it starts from stateless.
1、If 'pulsar.source.resetSubscriptionCursor' = 'true', each time a Flink task is restarted, it will consume according to the consumption location specified by the StartCursor configuration. 2、If 'pulsar.source.resetSubscriptionCursor' = 'fasle', no matter what value 'StartCursor' is set to, each restart will start consuming from the location recorded by the consumer group.
Brief change log
The value of pulsar.source.resetSubscriptionCursor determines whether Flink continues to consume from the location recorded by the consumer group or from the location specified by StartCursor when it starts from stateless.
Verifying this change
This change is a minor change and don't have any tests.
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
- [ ] Dependencies have been added or upgraded
- [ ] Public API has been changed (Public API is any class annotated with
@Public(Evolving)) - [ ] Serializers have been changed
- [ ] New feature has been introduced
- If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
@yebai1105 If the StartCursor for consumption is set to "earliest", will there be any issues just by changing this part? Looking at the code, there is no other place where the cursor is set except here. It seems that the "subscriptionInitialPosition" needs to be set when creating the consumer in the PulsarPartitionSplitReader.
Awesome work, congrats on your first merged pull request!