pulsar-flink
pulsar-flink copied to clipboard
[BUG] FlinkPulsarSource parallelism change
Its look like FlinkPulsarSource doesn’t handle parallelism changing correctly
I tried to look in the code but I didn’t found any reference that handle change in parallelism.
~ I am using key hash feature (version client / pulsar 2.7.0)
Solution Suggestion Our current use case is single topic per job (can be partitioned or non partitioned)
I thought about simple and fast solution to keep tracking on the current pulsar cursor, my solution can be found in the attached file. I would like to hear comment on the solution (by using current Pulsar API).
The algorithm explanation can be found in PartitionedTopicCursor class documentation (in the attached file).
- Currently the solution support only single topic (partitioned or non partitioned) and not support all Flink features but it can be modified.
- I used SourceSinkUtils.distributeRange to split range to task index (copy it to the example code).
- There are some unit tests that demonstrate the usage.
new-flink-pulsar-connector.zip
Thanks!
In key-shard mode consumed as a subscriber, the range is persisted by creating a subscriber test[0,1000]. The original progress is restored by reading this subscription at startup time.
I will implement the persistence of connector key-shared subscriptions according to this solution.