[FLINK-37510] Avoid duplicate calculations for SlidingEventTimeWindows
What is the purpose of the change
This PR aim to avoid duplicate calculations for SlidingEventTimeWindows.
The original code put the expression timestamp - size into the loop, so it causes duplicate calculation.
We should put them out of the loop and just calculate once.
Brief change log
Avoid duplicate calculations for SlidingEventTimeWindows
Verifying this change
This change is already covered by existing tests, such as (SlidingEventTimeWindowsTest).
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (no) - The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
- The S3 file system connector: (no)
Documentation
- Does this pull request introduce a new feature? (no)
CI report:
- b76dcdffdb0555cbb1eb0e4cbbd9430817eeccba Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
LGTM
ping @aljoscha @StephanEwen cc @1996fanrui @davidradl
Probably I missed something however can you please describe what is the issue here and how we could check that it is gone after this commit?
It's obviously. I have added the description.
It's obviously. I have added the description.
Thanks for updating the description however at least for me it is not obvious even after that update.
I tried to see the difference with help of jmh like
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(value = 2, jvmArgs = {"-Xms2G", "-Xmx2G"})
public class FlinkTest {
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(FlinkTest.class.getSimpleName())
.build();
new Runner(opt).run();
}
@State(Scope.Thread)
public static class MyState {
public long offset = 10000;
public long timestamp = 2000;
public long size = 1234;
public long slide = 12;
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void current(Blackhole blackhole, MyState state) {
long offset = state.offset;
long timestamp = state.timestamp;
long size = state.size;
long slide = state.slide;
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart; start > timestamp - size; start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
blackhole.consume(windows);
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void improved(Blackhole blackhole, MyState state) {
long offset = state.offset;
long timestamp = state.timestamp;
long size = state.size;
long slide = state.slide;
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
final long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
final long lower = timestamp - size;
for (long start = lastStart; start > lower; start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
blackhole.consume(windows);
}
}
my results for jdk17
Benchmark Mode Cnt Score Error Units
FlinkTest.current thrpt 10 2309.684 ± 29.933 ops/ms
FlinkTest.improved thrpt 10 2296.488 ± 47.739 ops/ms
for jdk21
Benchmark Mode Cnt Score Error Units
FlinkTest.current thrpt 10 2257.881 ± 24.168 ops/ms
FlinkTest.improved thrpt 10 2270.597 ± 13.188 ops/ms
so I don't see any significant difference (probably either this optimization has been already done by jdk itself or the rest of the code consumes much more time so the improvement brings so low benefit that it could be neglected)
So the question is still same from my side: how can we check that this PR fixes something?
Or did I miss anything?
@snuyanzin The benchmark ignored some questions. Obviously, the overhead of minus is very small. Otherwise, we should doubt the availability of JAVA. Second, The JVM will optimize it after interpretation.
On the other hand, avoid duplicate expressions is always a best practices know for anybody.
@snuyanzin Could you have time to review again ? This Improvement is just avoid the duplicate calculation which is not have a very strong improvement in performance.
ping @aljoscha @JunRuiLee @XComp @yunfengzhou-hub @reswqa