flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-33960][Scheduler] Fix the bug that Adaptive Scheduler doesn't respect the lowerBound when one flink job has more than 1 tasks

Open 1996fanrui opened this issue 1 year ago • 2 comments

What is the purpose of the change

[FLINK-33960][Scheduler] Fix the bug that Adaptive Scheduler doesn't respect the lowerBound when one flink job has more than 1 tasks

When we using the adaptive scheduler and the rescale api, users will set the lowerBound and upperBound for each job vertices. And users expect the parallelism of all vertices between lowerBound and upperBound.

But when one flink job has more than 1 vertex, and resource isn't enough. Some of lowerBound won't be respect. How to reproduce this bug:

One job has 2 job vertices, we set the resource requirements are:

Vertex1: lowerBound=2, upperBound=2
Vertex2: lowerBound=8, upperBound=8

They are same slotSharingGroup, and we only 5 available slots. This job shouldn't run due to the slots cannot meets the resource requiremnt for vertex2.

But the job can runs, and the parallelism of vertex2 is 5.

Why does this bug happen?

Flink calculates the minimumRequiredSlots for each slot sharing group, it should be the max lowerBound for all vertices of current slot sharing group.

But it's using the on the minimum lowerBound.

Brief change log

  • [FLINK-33752][JUnit5 migration] Migrate SlotSharingSlotAllocatorTest to Junit5 and Assertj
  • [FLINK-33960][Scheduler] Fix the bug that Adaptive Scheduler doesn't respect the lowerBound when one flink job has more than 1 tasks
  • [FLINK-33960][Scheduler][refactor] Simplify the implementation of SlotSharingGroupMetaInfo

Verifying this change

  • Added SlotSharingSlotAllocatorTest#testDetermineParallelismWithLowerBoundsInsufficientSlotsForPartialVertices

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

1996fanrui avatar Jan 01 '24 04:01 1996fanrui

CI report:

  • 29434ac8d02d09dc3adf7d4bef11fbcc1bc0a091 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Jan 01 '24 04:01 flinkbot

After this fixing, if the minimum bound isn't satisfied, Adaptive Scheduler won't start the new rescale. It means flink job still run with old parallelism.

I'm not sure should we cancel job first and waiting for resource until the minimum bound is satisfied? From the semantics of lower bound, when the resources of lower bound cannot be satisfied, the flink job should not run.

1996fanrui avatar Jan 01 '24 04:01 1996fanrui

Hi @gyfora @mxm , this PR is related to Adaptive Scheduler. Would you mind helping take a look? Big thanks~

1996fanrui avatar Jan 31 '24 10:01 1996fanrui

Thanks @1996fanrui for the PR , I will try to allocate some time to review this in the next few days. I need some extra time to familiarise myself with all the bits :)

gyfora avatar Jan 31 '24 10:01 gyfora

Thanks for looking into this @1996fanrui! I'll take a look.

mxm avatar Jan 31 '24 10:01 mxm