flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-35165][runtime/coordination] AdaptiveBatch Scheduler should not restrict the default source parall…

Open venkata91 opened this issue 10 months ago • 13 comments

…elism to the max parallelism set

What is the purpose of the change

With AdaptiveBatchScheduler, the current behavior is if both execution.batch.adaptive.auto-parallelism.default-source-parallelism and execution.batch.adaptive.auto-parallelism.max-parallelism configurations are specified and if the execution.batch.adaptive.auto-parallelism.default-source-parallelism is greater than the execution.batch.adaptive.auto-parallelism.max-parallelism, the source parallelism is bounded to execution.batch.adaptive.auto-parallelism.max-parallelism.

  • Source vertex is unique and does not have any upstream vertices - Downstream vertices read shuffled data partitioned by key, which is not the case for the Source vertex
  • Limiting source parallelism by downstream vertices' max parallelism is incorrect.

For eg: In the case of, "High filter selectivity with huge amounts of data to read", this has the following issues:

  • Setting high "execution.batch.adaptive.auto-parallelism.max-parallelism" so that source parallelism can be set higher can lead to small blocks and sub-optimal performance. Setting high "execution.batch.adaptive.auto-parallelism.max-parallelism" requires careful tuning of network buffer configurations which is unnecessary in cases where it is not required just so that the source parallelism can be set high.

The proposed solution is to decouple the configs execution.batch.adaptive.auto-parallelism.default-source-parallelism and execution.batch.adaptive.auto-parallelism.max-parallelism and not bound the value of source parallelism to execution.batch.adaptive.auto-parallelism.max-parallelism.

Verifying this change

This change is already covered by existing tests, such as (please describe tests).

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

venkata91 avatar Apr 28 '24 17:04 venkata91

CI report:

  • d657863b988d7a3915eb2d1d4ebf69ada9242a0a Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Apr 28 '24 18:04 flinkbot

cc @SinBex and @JunRuiLee for reviews.

venkata91 avatar Apr 30 '24 03:04 venkata91

Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex.

WDYT?

I think that makes sense. Basically what you're saying is if source's max parallelism is determined by the source itself which is < default-source-parallelism config, we should cap it by the source computed max parallelism correct? If so, I agree with that.

venkata91 avatar May 13 '24 16:05 venkata91

Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex. WDYT?

I think that makes sense. Basically what you're saying is if source's max parallelism is determined by the source itself which is < default-source-parallelism config, we should cap it by the source computed max parallelism correct? If so, I agree with that.

Yes, that's correct.

JunRuiLee avatar May 14 '24 01:05 JunRuiLee

Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex. WDYT?

I think that makes sense. Basically what you're saying is if source's max parallelism is determined by the source itself which is < default-source-parallelism config, we should cap it by the source computed max parallelism correct? If so, I agree with that.

Yes, that's correct.

@JunRuiLee Sorry for the late reply. I looked at the code again and it does look to be doing as what we expected. Can you please point me to the corresponding code reference?

venkata91 avatar Jun 13 '24 18:06 venkata91

@JunRuiLee Gentle ping!

venkata91 avatar Jun 22 '24 01:06 venkata91

Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex. WDYT?

I think that makes sense. Basically what you're saying is if source's max parallelism is determined by the source itself which is < default-source-parallelism config, we should cap it by the source computed max parallelism correct? If so, I agree with that.

Yes, that's correct.

@JunRuiLee Sorry for the late reply. I looked at the code again and it does look to be doing as what we expected. Can you please point me to the corresponding code reference?

Sorry for the late response. My point is that the upper bound returned by computeSourceParallelismUpperBound should be the minimum of execution.batch.adaptive.auto-parallelism.default-source-parallelism and the maximum parallelism of the JobVertex itself, rather than just considering execution.batch.adaptive.auto-parallelism.default-source-parallelism.

A simple reproducible case is to replace the executeJob method in AdaptiveBatchSchedulerITCase with the following code:

private void executeJob(Boolean useSourceParallelismInference) throws Exception {
        final Configuration configuration = createConfiguration();
        configuration.set(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING, true);

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironment(configuration);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        List<SlotSharingGroup> slotSharingGroups = new ArrayList<>();

        for (int i = 0; i < 3; ++i) {
            SlotSharingGroup group =
                    SlotSharingGroup.newBuilder("group" + i)
                            .setCpuCores(1.0)
                            .setTaskHeapMemory(MemorySize.parse("100m"))
                            .build();
            slotSharingGroups.add(group);
        }

        DataStream<Long> source1;
        DataStream<Long> source2;

        if (useSourceParallelismInference) {
            source1 =
                    env.fromSource(
                                    new TestingParallelismInferenceNumberSequenceSource(
                                            0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_1),
                                    WatermarkStrategy.noWatermarks(),
                                    "source1")
                            .slotSharingGroup(slotSharingGroups.get(0));
            source2 =
                    env.fromSource(
                                    new TestingParallelismInferenceNumberSequenceSource(
                                            0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_2),
                                    WatermarkStrategy.noWatermarks(),
                                    "source2")
                            .slotSharingGroup(slotSharingGroups.get(1));
        } else {
            source1 =
                    env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                            .setParallelism(-1)
                            .name("source1")
                            .slotSharingGroup(slotSharingGroups.get(0))
                            .setMaxParallelism(2);
            source2 =
                    env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                            .setParallelism(SOURCE_PARALLELISM_2)
                            .name("source2")
                            .slotSharingGroup(slotSharingGroups.get(1));
        }

        source1.union(source2)
                .rescale()
                .map(new NumberCounter())
                .name("map")
                .slotSharingGroup(slotSharingGroups.get(2));

        env.execute();
    }

Then run the testScheduling case.

JunRuiLee avatar Jun 23 '24 07:06 JunRuiLee

@JunRuiLee First of all, apologies for very late reply!

I think the issue is, if source parallelism is not explicitly set and if execution.batch.adaptive.auto-parallelism.max-parallelism is set < execution.batch.adaptive.auto-parallelism.default-source-parallelism then the source vertex max parallelism is set to the value of execution.batch.adaptive.auto-parallelism.max-parallelism.

My point is that the upper bound returned by computeSourceParallelismUpperBound should be the minimum of execution.batch.adaptive.auto-parallelism.default-source-parallelism and the maximum parallelism of the JobVertex itself

If the above happens, then we will be back to square 1 where the source parallelism is bounded by execution.batch.adaptive.auto-parallelism.max-parallelism even if execution.batch.adaptive.auto-parallelism.default-source-parallelism set greater than that.

Thoughts?

venkata91 avatar Sep 23 '24 04:09 venkata91

To illustrate with an example test case, try changing the executeJob and createConfiguration with below code and run testScheduling:

private void executeJob(Boolean useSourceParallelismInference) throws Exception {
        final Configuration configuration = createConfiguration();

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.createLocalEnvironment(configuration);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        List<SlotSharingGroup> slotSharingGroups = new ArrayList<>();

        for (int i = 0; i < 3; ++i) {
            SlotSharingGroup group =
                    SlotSharingGroup.newBuilder("group" + i)
                            .setCpuCores(1.0)
                            .setTaskHeapMemory(MemorySize.parse("100m"))
                            .build();
            slotSharingGroups.add(group);
        }

        DataStream<Long> source1;
        DataStream<Long> source2;

        if (useSourceParallelismInference) {
            source1 =
                    env.fromSource(
                                    new TestingParallelismInferenceNumberSequenceSource(
                                            0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_1),
                                    WatermarkStrategy.noWatermarks(),
                                    "source1")
                            .slotSharingGroup(slotSharingGroups.get(0));
            source2 =
                    env.fromSource(
                                    new TestingParallelismInferenceNumberSequenceSource(
                                            0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_2),
                                    WatermarkStrategy.noWatermarks(),
                                    "source2")
                            .slotSharingGroup(slotSharingGroups.get(1));
        } else {
            source1 =
                    env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                            .setParallelism(-1)
                            .name("source1")
                            .slotSharingGroup(slotSharingGroups.get(0));
            source2 =
                    env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                            .setParallelism(-1)
                            .name("source2")
                            .slotSharingGroup(slotSharingGroups.get(1));
        }

        source1.union(source2)
                .rescale()
                .map(new NumberCounter())
                .name("map")
                .slotSharingGroup(slotSharingGroups.get(2));

        env.execute();
    }

    private static Configuration createConfiguration() {
        final Configuration configuration = new Configuration();
        configuration.set(RestOptions.BIND_PORT, "0");
        configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, Duration.ofMillis(5000L));
        configuration.set(
                BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM,
                DEFAULT_MAX_PARALLELISM);
        configuration.set(
                BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM,
                2 * DEFAULT_MAX_PARALLELISM);
        configuration.set(
                BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK,
                MemorySize.parse("150kb"));
        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4kb"));
        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);

        return configuration;
    }

venkata91 avatar Sep 23 '24 04:09 venkata91

@JunRuiLee Gentle ping!

venkata91 avatar Sep 30 '24 16:09 venkata91

Gentle ping @SinBex!

venkata91 avatar Oct 05 '24 04:10 venkata91

Sorry for the delayed response due to the National Day holiday in China.

Currently, there are three key fields related to parallelism:

  1. defaultMaxParallelism: This comes from the config option values auto-parallelism.max-parallelism or default.parallelism.
  2. globalDefaultSourceParallelism: This comes from config option value default-source-parallelism or field defaultMaxParallelism.
  3. JobVertex max parallelism: This is either user-configured by calling setMaxParallelism method or fallback to defaultMaxParallelism.

I think the issue is, if source parallelism is not explicitly set and if execution.batch.adaptive.auto-parallelism.max-parallelism is set < execution.batch.adaptive.auto-parallelism.default-source-parallelism then the source vertex max parallelism is set to the value of execution.batch.adaptive.auto-parallelism.max-parallelism.

The main issue here is that I think if a user has set the max parallelism of the source (called setMaxParallelism method), we must respect this max parallelism. However, the current implementation does not account for this and only considers globalDefaultSourceParallelism.

JunRuiLee avatar Oct 09 '24 03:10 JunRuiLee

Sorry for the delayed response due to the National Day holiday in China.

Currently, there are three key fields related to parallelism:

  1. defaultMaxParallelism: This comes from the config option values auto-parallelism.max-parallelism or default.parallelism.
  2. globalDefaultSourceParallelism: This comes from config option value default-source-parallelism or field defaultMaxParallelism.
  3. JobVertex max parallelism: This is either user-configured by calling setMaxParallelism method or fallback to defaultMaxParallelism.

I think the issue is, if source parallelism is not explicitly set and if execution.batch.adaptive.auto-parallelism.max-parallelism is set < execution.batch.adaptive.auto-parallelism.default-source-parallelism then the source vertex max parallelism is set to the value of execution.batch.adaptive.auto-parallelism.max-parallelism.

The main issue here is that I think if a user has set the max parallelism of the source (called setMaxParallelism method), we must respect this max parallelism. However, the current implementation does not account for this and only considers globalDefaultSourceParallelism.

@JunRuiLee I see, got it. Basically you're saying if user sets max parallelism through jobVertex, then that shouldn't be overridden. I think I figured out how to handle this case and updated the changes. I am in the process of adding the unit/IT tests. Will update that shortly. Please take a look whenever you get a chance on the approach.

venkata91 avatar Oct 16 '24 23:10 venkata91