pulsar-flink icon indicating copy to clipboard operation
pulsar-flink copied to clipboard

[BUG] FlinkPulsarSource parallelism change

Open aryemazouz opened this issue 4 years ago • 1 comments

Its look like FlinkPulsarSource doesn’t handle parallelism changing correctly

I tried to look in the code but I didn’t found any reference that handle change in parallelism.

~ I am using key hash feature (version client / pulsar 2.7.0)

Solution Suggestion Our current use case is single topic per job (can be partitioned or non partitioned)

I thought about simple and fast solution to keep tracking on the current pulsar cursor, my solution can be found in the attached file. I would like to hear comment on the solution (by using current Pulsar API).

The algorithm explanation can be found in PartitionedTopicCursor class documentation (in the attached file).

  • Currently the solution support only single topic (partitioned or non partitioned) and not support all Flink features but it can be modified.
  • I used SourceSinkUtils.distributeRange to split range to task index (copy it to the example code).
  • There are some unit tests that demonstrate the usage.

new-flink-pulsar-connector.zip

Thanks!

aryemazouz avatar Jan 26 '21 15:01 aryemazouz

In key-shard mode consumed as a subscriber, the range is persisted by creating a subscriber test[0,1000]. The original progress is restored by reading this subscription at startup time.

I will implement the persistence of connector key-shared subscriptions according to this solution.

jianyun8023 avatar Jan 29 '21 01:01 jianyun8023