[FLINK-30088][runtime] Fix excessive state updates for TtlMapState and TtlListState
What is the purpose of the change
Avoid unnecessary state updates for TtlMapState and TtlListState in case they are not changed. This affects only HashMapStateBackend, MemoryStateBackend and FsStateBackend.
Brief change log
- add if condition to return original List if no records were expired for TtlListState
- add if condition to return original Map if no records were expired for TtlMapState
Verifying this change
This change added tests and can be verified as follows:
- add testStateNotChangedWithoutCleanup to TtlStateTestBase
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no - The serializers: no
- The runtime per-record code paths (performance sensitive): yes
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
Documentation
- Does this pull request introduce a new feature? no
CI report:
- 42183069605bf6bd92560f870da00419de9d4304 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
@azagrebin , @Myasuka , @Jiayi-Liao , could you please help with code review?
Hi @Myasuka ! Thanks for your reply. The main benefit of this change is avoiding of unnecessary stateIterator.update() invocation inside TtlIncrementalCleanup.runCleanup() method. As for now it happens even no keys inside ttlState were expired. And after my change the stateIterator.update() invocation would happen only if any key was expired inside ttlState.
@rovboyko If so, I think we can add such a TTL state benchmark under https://github.com/apache/flink-benchmarks/tree/master/src/main/java/org/apache/flink/state/benchmark to prove this improvement and also help Flink community to track the TTL state performance changes.
@rovboyko I created ticket FLINK-30535 to focus on the TTL state benchmarks.
@Myasuka after implementation of FLINK-30535 and FLINK-33881 I changed the current commit to optimize only TtlMapState.
After the optimization benchamrk results seem not so bad (+17% performance):
| Benchmark | backendType | expiredOption | stateVisibility | updateType | Non-optimized | Optimized | Units |
|---|---|---|---|---|---|---|---|
| TtlMapStateBenchmark.mapAdd | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnCreateAndWrite | 361.671 | 521.839 | ops/ms |
| TtlMapStateBenchmark.mapAdd | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnReadAndWrite | 366.513 | 523.892 | ops/ms |
| TtlMapStateBenchmark.mapAdd | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 364.311 | 461.037 | ops/ms |
| TtlMapStateBenchmark.mapAdd | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 362.902 | 510.513 | ops/ms |
| TtlMapStateBenchmark.mapAdd | HEAP | NeverExpired | NeverReturnExpired | OnCreateAndWrite | 361.08 | 520.319 | ops/ms |
| TtlMapStateBenchmark.mapAdd | HEAP | NeverExpired | NeverReturnExpired | OnReadAndWrite | 364.026 | 524.573 | ops/ms |
| TtlMapStateBenchmark.mapAdd | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 362.199 | 515.707 | ops/ms |
| TtlMapStateBenchmark.mapAdd | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 357.642 | 433.35 | ops/ms |
| TtlMapStateBenchmark.mapGet | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnCreateAndWrite | 304.937 | 386.748 | ops/ms |
| TtlMapStateBenchmark.mapGet | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnReadAndWrite | 260.486 | 321.793 | ops/ms |
| TtlMapStateBenchmark.mapGet | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 324.409 | 397.574 | ops/ms |
| TtlMapStateBenchmark.mapGet | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 265.357 | 353.769 | ops/ms |
| TtlMapStateBenchmark.mapGet | HEAP | NeverExpired | NeverReturnExpired | OnCreateAndWrite | 328.767 | 405.722 | ops/ms |
| TtlMapStateBenchmark.mapGet | HEAP | NeverExpired | NeverReturnExpired | OnReadAndWrite | 255.596 | 338.208 | ops/ms |
| TtlMapStateBenchmark.mapGet | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 317.633 | 400.027 | ops/ms |
| TtlMapStateBenchmark.mapGet | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 254.176 | 343.937 | ops/ms |
| TtlMapStateBenchmark.mapIsEmpty | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnCreateAndWrite | 316.203 | 412.834 | ops/ms |
| TtlMapStateBenchmark.mapIsEmpty | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnReadAndWrite | 327.426 | 414.538 | ops/ms |
| TtlMapStateBenchmark.mapIsEmpty | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 323.163 | 406.269 | ops/ms |
| TtlMapStateBenchmark.mapIsEmpty | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 334.041 | 420.389 | ops/ms |
| TtlMapStateBenchmark.mapIsEmpty | HEAP | NeverExpired | NeverReturnExpired | OnCreateAndWrite | 346.116 | 406.019 | ops/ms |
| TtlMapStateBenchmark.mapIsEmpty | HEAP | NeverExpired | NeverReturnExpired | OnReadAndWrite | 320.814 | 405.261 | ops/ms |
| TtlMapStateBenchmark.mapIsEmpty | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 340.874 | 409.718 | ops/ms |
| TtlMapStateBenchmark.mapIsEmpty | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 333.791 | 408.396 | ops/ms |
| TtlMapStateBenchmark.mapIterator | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnCreateAndWrite | 2760.188 | 3413.68 | ops/ms |
| TtlMapStateBenchmark.mapIterator | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnReadAndWrite | 2322.375 | 2419.871 | ops/ms |
| TtlMapStateBenchmark.mapIterator | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 2694.602 | 3372.643 | ops/ms |
| TtlMapStateBenchmark.mapIterator | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 2275.37 | 2156.808 | ops/ms |
| TtlMapStateBenchmark.mapIterator | HEAP | NeverExpired | NeverReturnExpired | OnCreateAndWrite | 2723.703 | 3369.097 | ops/ms |
| TtlMapStateBenchmark.mapIterator | HEAP | NeverExpired | NeverReturnExpired | OnReadAndWrite | 2324.465 | 2453.601 | ops/ms |
| TtlMapStateBenchmark.mapIterator | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 2694.324 | 3364.168 | ops/ms |
| TtlMapStateBenchmark.mapIterator | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 2339.08 | 2377.672 | ops/ms |
| TtlMapStateBenchmark.mapPutAll | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnCreateAndWrite | 270.326 | 275.442 | ops/ms |
| TtlMapStateBenchmark.mapPutAll | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnReadAndWrite | 269.82 | 282.63 | ops/ms |
| TtlMapStateBenchmark.mapPutAll | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 271.646 | 280.797 | ops/ms |
| TtlMapStateBenchmark.mapPutAll | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 270.449 | 257.802 | ops/ms |
| TtlMapStateBenchmark.mapPutAll | HEAP | NeverExpired | NeverReturnExpired | OnCreateAndWrite | 271.497 | 274.694 | ops/ms |
| TtlMapStateBenchmark.mapPutAll | HEAP | NeverExpired | NeverReturnExpired | OnReadAndWrite | 270.721 | 252.761 | ops/ms |
| TtlMapStateBenchmark.mapPutAll | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 292.303 | 268.975 | ops/ms |
| TtlMapStateBenchmark.mapPutAll | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 271.283 | 277.794 | ops/ms |
| TtlMapStateBenchmark.mapUpdate | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnCreateAndWrite | 278.52 | 345.521 | ops/ms |
| TtlMapStateBenchmark.mapUpdate | HEAP | Expire3PercentPerIteration | NeverReturnExpired | OnReadAndWrite | 272.15 | 328.734 | ops/ms |
| TtlMapStateBenchmark.mapUpdate | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 274.306 | 358.366 | ops/ms |
| TtlMapStateBenchmark.mapUpdate | HEAP | Expire3PercentPerIteration | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 272.916 | 347.087 | ops/ms |
| TtlMapStateBenchmark.mapUpdate | HEAP | NeverExpired | NeverReturnExpired | OnCreateAndWrite | 264.072 | 346.595 | ops/ms |
| TtlMapStateBenchmark.mapUpdate | HEAP | NeverExpired | NeverReturnExpired | OnReadAndWrite | 273.377 | 341.417 | ops/ms |
| TtlMapStateBenchmark.mapUpdate | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnCreateAndWrite | 275.702 | 352.406 | ops/ms |
| TtlMapStateBenchmark.mapUpdate | HEAP | NeverExpired | ReturnExpiredIfNotCleanedUp | OnReadAndWrite | 275.58 | 357.394 | ops/ms |
| total: | 32362.908 | 38118.387 |
I updated the PR and published the benchmark results. @azagrebin , @Myasuka , @Jiayi-Liao , could you please help with code review again?