[FLINK-32594][runtime]Support EndOfStreamTrigger and isOutputOnEOF operator attribute to optimize task deployment
What is the purpose of the change
This pull request allows using BLOCKING in streaming execution mode if operator only outputs records on end of input. Checkpoint is still supported after all of the non-pipeline tasks have finished. See this [FLIP-331] for details.
Brief change log
- Add an internal method to
StreamOperatorto allow implementations to report whether the operator will output records on end of input and whether it supports internal sort. - Add the
EOFCoGroupOperatorto optimize the DataStream#coGroup when output on end of input. - Add the
EOFAggregationOperatorto optimize the DataStream#aggregation when output on end of input. - Add the
EndOfStreamTriggerinGlobalWindowsto support a trigger that fires only if the input stream reaches EOF. - Optimize job when schedule and execute the output-on-EOF operator in batch mode.
- If there are blocking-typed partition tasks in the job, Periodic scheduling CheckpointScheduler will not be scheduled when
JobStatusswitches toRUNNING, until there is no task writing data to a blocking-typed partition.
Verifying this change
This change added tests and can be verified as follows:
- Add integration tests
OperatorOutputEOFITCaseand unit tests inStreamingJobGraphGeneratorTestto confirm that, inRuntimeExecutionMode.STREAMING, the operator has been schedule correctly and the downstream node would not be deployed before the output-on-EOF operator node has finished. - Add the unit tests in
StreamGraphGeneratorTestandStreamGraphGeneratorBatchExecutionTestto confirm that the operator will setInputRequirementcorrectly.
To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations. Each result is measured by taking the average execution time across 5 runs with the given configuration.
We run benchmarks on a MacBook with the latest Flink 1.18-snapshot and parallelism=1. Default standalone cluster configuration is used except:
jobmanager.memory.process.size: 6400m
taskmanager.memory.process.size: 6912m
Execute DataStream#CoGroup
This benchmark uses DataStream#coGroup to process records from two bounded inputs. Each input will generate records with (key = i, value = i) for i from 1 to 8*10^7.
Below is the DataStream program code snippet.
data1.coGroup(data2)
.where(tuple -> tuple.f0)
.equalTo(tuple -> tuple.f0)
.window(GlobalWindows.createWithEndOfStreamTrigger())
.apply(new CustomCoGroupFunction())
.addSink(...);
The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode and optimized streaming mode after this PR.
The result shows that DataStream#coGroup in optimized streaming mode can be 22X as fast as streaming mode and 3X as fast as batch mode.
| STREAMING | BATCH | Optimized STREAMING |
|---|---|---|
| 66 ± 1 (100%, 1202426 ms) | 491 ± 5 (743%, 162731 ms) | 1506 ± 10 (2281%, 53098 ms) |
Execute DataStream#Aggregate
This benchmark uses DataStream#aggregate to process 810^7 records. These records are evenly distributed across 810^5 keys. More specifically, the source will generate records with (key = i, value = i) for i from 1 to 8*10^5, and repeat this process 100 times.
Below is the DataStream program code snippet.
events
.keyBy(value -> value.f0)
.window(GlobalWindows.createWithEndOfStreamTrigger())
.aggregate(new Aggregator())
.addSink(...);
The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode and optimized streaming mode after this PR.
The result shows that DataStream#aggregate in optimized streaming mode can be 10X as fast as streaming mode and 11% faster than batch mode.
| STREAMING | BATCH | Optimized STREAMING |
|---|---|---|
| 163 ± 0 (100%, 490478 ms) | 1561 ± 16 (957%, 51237 ms) | 1733 ± 9 (1063%, 46143 ms) |
OperatorDelayedDeploy
The following program demonstrates the scenario described in the motivation section. The program needs to pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.
source1.keyBy(value -> value.f0)
.window(GlobalWindows.createWithEndOfStreamTrigger())
.aggregate(new MyAggregator()).name("Process1")
.connect(source2.keyBy(value -> value.f0))
.transform("Process2", Types.INT, new MyProcessOperator())
.addSink(...);
Resource Utilization
We can use this program to demonstrate that the program requires less slot resources. More specifically, suppose we configure the standalone cluster with taskmanager.numberOfTaskSlots = 2, and set the Source1,Process1, Source2 and Process2 in 4 different SlotSharingGroups, the program will fail to be deployed before this FLIP. And the program can be deployed successfully after this FLIP. This is because Source2 and Process2 can be deplyed after Source1,Process1 finished and released their slots.
Additionally, we can use this program to demonstrate that it can achieve higher performance because Process2 will not need to keep buffer records emitted by Source2 in its memory while Process1 has not reached EOF. More specifically, the program can fail with OOM before this FLIP when the number of records in inputs is high. And the program can finish successfully without OOM after this FLIP.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): yes - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? not documented
CI report:
- a32faf56ef17af1a8ef127a4501fc85c2dfcf336 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
I added the org.apache.flink.streaming.api.windowing.assigners.EndOfStreamWindows.assignWindows in the archunit-violations file to avoid org.apache.flink.architecture.ArchitectureTest detecting the annotation, which has also be done for other window assigners.
@flinkbot run azure