flink
flink copied to clipboard
[FLINK-35165][runtime/coordination] AdaptiveBatch Scheduler should not restrict the default source parall…
…elism to the max parallelism set
What is the purpose of the change
With AdaptiveBatchScheduler, the current behavior is if both execution.batch.adaptive.auto-parallelism.default-source-parallelism
and execution.batch.adaptive.auto-parallelism.max-parallelism
configurations are specified and if the execution.batch.adaptive.auto-parallelism.default-source-parallelism
is greater than the execution.batch.adaptive.auto-parallelism.max-parallelism
, the source parallelism is bounded to execution.batch.adaptive.auto-parallelism.max-parallelism
.
- Source vertex is unique and does not have any upstream vertices - Downstream vertices read shuffled data partitioned by key, which is not the case for the Source vertex
- Limiting source parallelism by downstream vertices' max parallelism is incorrect.
For eg: In the case of, "High filter selectivity with huge amounts of data to read", this has the following issues:
- Setting high "execution.batch.adaptive.auto-parallelism.max-parallelism" so that source parallelism can be set higher can lead to small blocks and sub-optimal performance. Setting high "execution.batch.adaptive.auto-parallelism.max-parallelism" requires careful tuning of network buffer configurations which is unnecessary in cases where it is not required just so that the source parallelism can be set high.
The proposed solution is to decouple the configs execution.batch.adaptive.auto-parallelism.default-source-parallelism
and execution.batch.adaptive.auto-parallelism.max-parallelism
and not bound the value of source parallelism to execution.batch.adaptive.auto-parallelism.max-parallelism
.
Verifying this change
This change is already covered by existing tests, such as (please describe tests).
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving)
: (yes / no) - The serializers: (yes / no / don't know)
- The runtime per-record code paths (performance sensitive): (yes / no / don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
- The S3 file system connector: (yes / no / don't know)
Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
CI report:
- d657863b988d7a3915eb2d1d4ebf69ada9242a0a Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:-
@flinkbot run azure
re-run the last Azure build
cc @SinBex and @JunRuiLee for reviews.
Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex.
WDYT?
I think that makes sense. Basically what you're saying is if source's max parallelism
is determined by the source
itself which is < default-source-parallelism
config, we should cap it by the source computed max parallelism
correct? If so, I agree with that.
Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex. WDYT?
I think that makes sense. Basically what you're saying is if
source's max parallelism
is determined by thesource
itself which is <default-source-parallelism
config, we should cap it by thesource computed max parallelism
correct? If so, I agree with that.
Yes, that's correct.
Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex. WDYT?
I think that makes sense. Basically what you're saying is if
source's max parallelism
is determined by thesource
itself which is <default-source-parallelism
config, we should cap it by thesource computed max parallelism
correct? If so, I agree with that.Yes, that's correct.
@JunRuiLee Sorry for the late reply. I looked at the code again and it does look to be doing as what we expected. Can you please point me to the corresponding code reference?
@JunRuiLee Gentle ping!
Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex. WDYT?
I think that makes sense. Basically what you're saying is if
source's max parallelism
is determined by thesource
itself which is <default-source-parallelism
config, we should cap it by thesource computed max parallelism
correct? If so, I agree with that.Yes, that's correct.
@JunRuiLee Sorry for the late reply. I looked at the code again and it does look to be doing as what we expected. Can you please point me to the corresponding code reference?
Sorry for the late response. My point is that the upper bound returned by computeSourceParallelismUpperBound
should be the minimum of execution.batch.adaptive.auto-parallelism.default-source-parallelism
and the maximum parallelism of the JobVertex itself, rather than just considering execution.batch.adaptive.auto-parallelism.default-source-parallelism
.
A simple reproducible case is to replace the executeJob method in AdaptiveBatchSchedulerITCase
with the following code:
private void executeJob(Boolean useSourceParallelismInference) throws Exception {
final Configuration configuration = createConfiguration();
configuration.set(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING, true);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
List<SlotSharingGroup> slotSharingGroups = new ArrayList<>();
for (int i = 0; i < 3; ++i) {
SlotSharingGroup group =
SlotSharingGroup.newBuilder("group" + i)
.setCpuCores(1.0)
.setTaskHeapMemory(MemorySize.parse("100m"))
.build();
slotSharingGroups.add(group);
}
DataStream<Long> source1;
DataStream<Long> source2;
if (useSourceParallelismInference) {
source1 =
env.fromSource(
new TestingParallelismInferenceNumberSequenceSource(
0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_1),
WatermarkStrategy.noWatermarks(),
"source1")
.slotSharingGroup(slotSharingGroups.get(0));
source2 =
env.fromSource(
new TestingParallelismInferenceNumberSequenceSource(
0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_2),
WatermarkStrategy.noWatermarks(),
"source2")
.slotSharingGroup(slotSharingGroups.get(1));
} else {
source1 =
env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
.setParallelism(-1)
.name("source1")
.slotSharingGroup(slotSharingGroups.get(0))
.setMaxParallelism(2);
source2 =
env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
.setParallelism(SOURCE_PARALLELISM_2)
.name("source2")
.slotSharingGroup(slotSharingGroups.get(1));
}
source1.union(source2)
.rescale()
.map(new NumberCounter())
.name("map")
.slotSharingGroup(slotSharingGroups.get(2));
env.execute();
}
Then run the testScheduling
case.
@JunRuiLee First of all, apologies for very late reply!
I think the issue is, if source parallelism is not explicitly set and if execution.batch.adaptive.auto-parallelism.max-parallelism
is set < execution.batch.adaptive.auto-parallelism.default-source-parallelism
then the source vertex max parallelism is set to the value of execution.batch.adaptive.auto-parallelism.max-parallelism
.
My point is that the upper bound returned by computeSourceParallelismUpperBound should be the minimum of execution.batch.adaptive.auto-parallelism.default-source-parallelism and the maximum parallelism of the JobVertex itself
If the above happens, then we will be back to square 1 where the source parallelism is bounded by execution.batch.adaptive.auto-parallelism.max-parallelism
even if execution.batch.adaptive.auto-parallelism.default-source-parallelism
set greater than that.
Thoughts?
To illustrate with an example test case, try changing the executeJob
and createConfiguration
with below code and run testScheduling
:
private void executeJob(Boolean useSourceParallelismInference) throws Exception {
final Configuration configuration = createConfiguration();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
List<SlotSharingGroup> slotSharingGroups = new ArrayList<>();
for (int i = 0; i < 3; ++i) {
SlotSharingGroup group =
SlotSharingGroup.newBuilder("group" + i)
.setCpuCores(1.0)
.setTaskHeapMemory(MemorySize.parse("100m"))
.build();
slotSharingGroups.add(group);
}
DataStream<Long> source1;
DataStream<Long> source2;
if (useSourceParallelismInference) {
source1 =
env.fromSource(
new TestingParallelismInferenceNumberSequenceSource(
0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_1),
WatermarkStrategy.noWatermarks(),
"source1")
.slotSharingGroup(slotSharingGroups.get(0));
source2 =
env.fromSource(
new TestingParallelismInferenceNumberSequenceSource(
0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_2),
WatermarkStrategy.noWatermarks(),
"source2")
.slotSharingGroup(slotSharingGroups.get(1));
} else {
source1 =
env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
.setParallelism(-1)
.name("source1")
.slotSharingGroup(slotSharingGroups.get(0));
source2 =
env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
.setParallelism(-1)
.name("source2")
.slotSharingGroup(slotSharingGroups.get(1));
}
source1.union(source2)
.rescale()
.map(new NumberCounter())
.name("map")
.slotSharingGroup(slotSharingGroups.get(2));
env.execute();
}
private static Configuration createConfiguration() {
final Configuration configuration = new Configuration();
configuration.set(RestOptions.BIND_PORT, "0");
configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, Duration.ofMillis(5000L));
configuration.set(
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM,
DEFAULT_MAX_PARALLELISM);
configuration.set(
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM,
2 * DEFAULT_MAX_PARALLELISM);
configuration.set(
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK,
MemorySize.parse("150kb"));
configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4kb"));
configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, 1);
return configuration;
}
@JunRuiLee Gentle ping!
Gentle ping @SinBex!
Sorry for the delayed response due to the National Day holiday in China.
Currently, there are three key fields related to parallelism:
- defaultMaxParallelism: This comes from the config option values auto-parallelism.max-parallelism or default.parallelism.
- globalDefaultSourceParallelism: This comes from config option value default-source-parallelism or field defaultMaxParallelism.
- JobVertex max parallelism: This is either user-configured by calling setMaxParallelism method or fallback to defaultMaxParallelism.
I think the issue is, if source parallelism is not explicitly set and if
execution.batch.adaptive.auto-parallelism.max-parallelism
is set <execution.batch.adaptive.auto-parallelism.default-source-parallelism
then the source vertex max parallelism is set to the value ofexecution.batch.adaptive.auto-parallelism.max-parallelism
.
The main issue here is that I think if a user has set the max parallelism of the source (called setMaxParallelism method), we must respect this max parallelism. However, the current implementation does not account for this and only considers globalDefaultSourceParallelism.
Sorry for the delayed response due to the National Day holiday in China.
Currently, there are three key fields related to parallelism:
- defaultMaxParallelism: This comes from the config option values auto-parallelism.max-parallelism or default.parallelism.
- globalDefaultSourceParallelism: This comes from config option value default-source-parallelism or field defaultMaxParallelism.
- JobVertex max parallelism: This is either user-configured by calling setMaxParallelism method or fallback to defaultMaxParallelism.
I think the issue is, if source parallelism is not explicitly set and if
execution.batch.adaptive.auto-parallelism.max-parallelism
is set <execution.batch.adaptive.auto-parallelism.default-source-parallelism
then the source vertex max parallelism is set to the value ofexecution.batch.adaptive.auto-parallelism.max-parallelism
.The main issue here is that I think if a user has set the max parallelism of the source (called setMaxParallelism method), we must respect this max parallelism. However, the current implementation does not account for this and only considers globalDefaultSourceParallelism.
@JunRuiLee I see, got it. Basically you're saying if user sets max parallelism through jobVertex
, then that shouldn't be overridden. I think I figured out how to handle this case and updated the changes. I am in the process of adding the unit/IT tests. Will update that shortly. Please take a look whenever you get a chance on the approach.