flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-37510] Avoid duplicate calculations for SlidingEventTimeWindows

Open beliefer opened this issue 9 months ago • 9 comments

What is the purpose of the change

This PR aim to avoid duplicate calculations for SlidingEventTimeWindows. The original code put the expression timestamp - size into the loop, so it causes duplicate calculation. We should put them out of the loop and just calculate once.

Brief change log

Avoid duplicate calculations for SlidingEventTimeWindows

Verifying this change

This change is already covered by existing tests, such as (SlidingEventTimeWindowsTest).

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

beliefer avatar Mar 19 '25 07:03 beliefer

CI report:

  • b76dcdffdb0555cbb1eb0e4cbbd9430817eeccba Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Mar 19 '25 08:03 flinkbot

LGTM

nilmadhab avatar Mar 19 '25 08:03 nilmadhab

ping @aljoscha @StephanEwen cc @1996fanrui @davidradl

beliefer avatar Mar 19 '25 11:03 beliefer

Probably I missed something however can you please describe what is the issue here and how we could check that it is gone after this commit?

It's obviously. I have added the description.

beliefer avatar Mar 24 '25 02:03 beliefer

It's obviously. I have added the description.

Thanks for updating the description however at least for me it is not obvious even after that update.

I tried to see the difference with help of jmh like

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(value = 2, jvmArgs = {"-Xms2G", "-Xmx2G"})
public class FlinkTest {

    public static void main(String[] args) throws RunnerException {

        Options opt = new OptionsBuilder()
                .include(FlinkTest.class.getSimpleName())
                .build();

        new Runner(opt).run();
    }

    @State(Scope.Thread)
    public static class MyState {
        public long offset = 10000;
        public long timestamp = 2000;
        public long size = 1234;
        public long slide = 12;
    }

    @Benchmark
    @BenchmarkMode(Mode.Throughput)
    public void current(Blackhole blackhole, MyState state) {
        long offset = state.offset;
        long timestamp = state.timestamp;
        long size = state.size;
        long slide = state.slide;
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
        for (long start = lastStart; start > timestamp - size; start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        blackhole.consume(windows);
    }

    @Benchmark
    @BenchmarkMode(Mode.Throughput)
    public void improved(Blackhole blackhole, MyState state) {
        long offset = state.offset;
        long timestamp = state.timestamp;
        long size = state.size;
        long slide = state.slide;
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        final long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
        final long lower = timestamp - size;
        for (long start = lastStart; start > lower; start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        blackhole.consume(windows);
    }

}

my results for jdk17

 Benchmark            Mode  Cnt     Score    Error   Units
 FlinkTest.current   thrpt   10  2309.684 ± 29.933  ops/ms
 FlinkTest.improved  thrpt   10  2296.488 ± 47.739  ops/ms

for jdk21

 Benchmark            Mode  Cnt     Score    Error   Units
 FlinkTest.current   thrpt   10  2257.881 ± 24.168  ops/ms
 FlinkTest.improved  thrpt   10  2270.597 ± 13.188  ops/ms

so I don't see any significant difference (probably either this optimization has been already done by jdk itself or the rest of the code consumes much more time so the improvement brings so low benefit that it could be neglected)

So the question is still same from my side: how can we check that this PR fixes something?

Or did I miss anything?

snuyanzin avatar Mar 24 '25 08:03 snuyanzin

@snuyanzin The benchmark ignored some questions. Obviously, the overhead of minus is very small. Otherwise, we should doubt the availability of JAVA. Second, The JVM will optimize it after interpretation.

beliefer avatar Mar 24 '25 09:03 beliefer

On the other hand, avoid duplicate expressions is always a best practices know for anybody.

beliefer avatar Mar 24 '25 09:03 beliefer

@snuyanzin Could you have time to review again ? This Improvement is just avoid the duplicate calculation which is not have a very strong improvement in performance.

beliefer avatar Apr 01 '25 10:04 beliefer

ping @aljoscha @JunRuiLee @XComp @yunfengzhou-hub @reswqa

beliefer avatar May 20 '25 08:05 beliefer