flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-30088][runtime] Fix excessive state updates for TtlMapState and TtlListState

Open rovboyko opened this issue 3 years ago • 7 comments

What is the purpose of the change

Avoid unnecessary state updates for TtlMapState and TtlListState in case they are not changed. This affects only HashMapStateBackend, MemoryStateBackend and FsStateBackend.

Brief change log

  • add if condition to return original List if no records were expired for TtlListState
  • add if condition to return original Map if no records were expired for TtlMapState

Verifying this change

This change added tests and can be verified as follows:

  • add testStateNotChangedWithoutCleanup to TtlStateTestBase

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no

rovboyko avatar Nov 28 '22 04:11 rovboyko

CI report:

  • 42183069605bf6bd92560f870da00419de9d4304 Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Nov 28 '22 04:11 flinkbot

@azagrebin , @Myasuka , @Jiayi-Liao , could you please help with code review?

rovboyko avatar Dec 09 '22 10:12 rovboyko

Hi @Myasuka ! Thanks for your reply. The main benefit of this change is avoiding of unnecessary stateIterator.update() invocation inside TtlIncrementalCleanup.runCleanup() method. As for now it happens even no keys inside ttlState were expired. And after my change the stateIterator.update() invocation would happen only if any key was expired inside ttlState.

rovboyko avatar Dec 27 '22 05:12 rovboyko

@rovboyko If so, I think we can add such a TTL state benchmark under https://github.com/apache/flink-benchmarks/tree/master/src/main/java/org/apache/flink/state/benchmark to prove this improvement and also help Flink community to track the TTL state performance changes.

Myasuka avatar Dec 27 '22 06:12 Myasuka

@rovboyko I created ticket FLINK-30535 to focus on the TTL state benchmarks.

Myasuka avatar Dec 30 '22 08:12 Myasuka

@Myasuka after implementation of FLINK-30535 and FLINK-33881 I changed the current commit to optimize only TtlMapState.

After the optimization benchamrk results seem not so bad (+17% performance):

Benchmark backendType expiredOption stateVisibility updateType Non-optimized Optimized Units
TtlMapStateBenchmark.mapAdd HEAP Expire3PercentPerIteration NeverReturnExpired OnCreateAndWrite 361.671 521.839 ops/ms
TtlMapStateBenchmark.mapAdd HEAP Expire3PercentPerIteration NeverReturnExpired OnReadAndWrite 366.513 523.892 ops/ms
TtlMapStateBenchmark.mapAdd HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnCreateAndWrite 364.311 461.037 ops/ms
TtlMapStateBenchmark.mapAdd HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnReadAndWrite 362.902 510.513 ops/ms
TtlMapStateBenchmark.mapAdd HEAP NeverExpired NeverReturnExpired OnCreateAndWrite 361.08 520.319 ops/ms
TtlMapStateBenchmark.mapAdd HEAP NeverExpired NeverReturnExpired OnReadAndWrite 364.026 524.573 ops/ms
TtlMapStateBenchmark.mapAdd HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnCreateAndWrite 362.199 515.707 ops/ms
TtlMapStateBenchmark.mapAdd HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnReadAndWrite 357.642 433.35 ops/ms
TtlMapStateBenchmark.mapGet HEAP Expire3PercentPerIteration NeverReturnExpired OnCreateAndWrite 304.937 386.748 ops/ms
TtlMapStateBenchmark.mapGet HEAP Expire3PercentPerIteration NeverReturnExpired OnReadAndWrite 260.486 321.793 ops/ms
TtlMapStateBenchmark.mapGet HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnCreateAndWrite 324.409 397.574 ops/ms
TtlMapStateBenchmark.mapGet HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnReadAndWrite 265.357 353.769 ops/ms
TtlMapStateBenchmark.mapGet HEAP NeverExpired NeverReturnExpired OnCreateAndWrite 328.767 405.722 ops/ms
TtlMapStateBenchmark.mapGet HEAP NeverExpired NeverReturnExpired OnReadAndWrite 255.596 338.208 ops/ms
TtlMapStateBenchmark.mapGet HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnCreateAndWrite 317.633 400.027 ops/ms
TtlMapStateBenchmark.mapGet HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnReadAndWrite 254.176 343.937 ops/ms
TtlMapStateBenchmark.mapIsEmpty HEAP Expire3PercentPerIteration NeverReturnExpired OnCreateAndWrite 316.203 412.834 ops/ms
TtlMapStateBenchmark.mapIsEmpty HEAP Expire3PercentPerIteration NeverReturnExpired OnReadAndWrite 327.426 414.538 ops/ms
TtlMapStateBenchmark.mapIsEmpty HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnCreateAndWrite 323.163 406.269 ops/ms
TtlMapStateBenchmark.mapIsEmpty HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnReadAndWrite 334.041 420.389 ops/ms
TtlMapStateBenchmark.mapIsEmpty HEAP NeverExpired NeverReturnExpired OnCreateAndWrite 346.116 406.019 ops/ms
TtlMapStateBenchmark.mapIsEmpty HEAP NeverExpired NeverReturnExpired OnReadAndWrite 320.814 405.261 ops/ms
TtlMapStateBenchmark.mapIsEmpty HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnCreateAndWrite 340.874 409.718 ops/ms
TtlMapStateBenchmark.mapIsEmpty HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnReadAndWrite 333.791 408.396 ops/ms
TtlMapStateBenchmark.mapIterator HEAP Expire3PercentPerIteration NeverReturnExpired OnCreateAndWrite 2760.188 3413.68 ops/ms
TtlMapStateBenchmark.mapIterator HEAP Expire3PercentPerIteration NeverReturnExpired OnReadAndWrite 2322.375 2419.871 ops/ms
TtlMapStateBenchmark.mapIterator HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnCreateAndWrite 2694.602 3372.643 ops/ms
TtlMapStateBenchmark.mapIterator HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnReadAndWrite 2275.37 2156.808 ops/ms
TtlMapStateBenchmark.mapIterator HEAP NeverExpired NeverReturnExpired OnCreateAndWrite 2723.703 3369.097 ops/ms
TtlMapStateBenchmark.mapIterator HEAP NeverExpired NeverReturnExpired OnReadAndWrite 2324.465 2453.601 ops/ms
TtlMapStateBenchmark.mapIterator HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnCreateAndWrite 2694.324 3364.168 ops/ms
TtlMapStateBenchmark.mapIterator HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnReadAndWrite 2339.08 2377.672 ops/ms
TtlMapStateBenchmark.mapPutAll HEAP Expire3PercentPerIteration NeverReturnExpired OnCreateAndWrite 270.326 275.442 ops/ms
TtlMapStateBenchmark.mapPutAll HEAP Expire3PercentPerIteration NeverReturnExpired OnReadAndWrite 269.82 282.63 ops/ms
TtlMapStateBenchmark.mapPutAll HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnCreateAndWrite 271.646 280.797 ops/ms
TtlMapStateBenchmark.mapPutAll HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnReadAndWrite 270.449 257.802 ops/ms
TtlMapStateBenchmark.mapPutAll HEAP NeverExpired NeverReturnExpired OnCreateAndWrite 271.497 274.694 ops/ms
TtlMapStateBenchmark.mapPutAll HEAP NeverExpired NeverReturnExpired OnReadAndWrite 270.721 252.761 ops/ms
TtlMapStateBenchmark.mapPutAll HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnCreateAndWrite 292.303 268.975 ops/ms
TtlMapStateBenchmark.mapPutAll HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnReadAndWrite 271.283 277.794 ops/ms
TtlMapStateBenchmark.mapUpdate HEAP Expire3PercentPerIteration NeverReturnExpired OnCreateAndWrite 278.52 345.521 ops/ms
TtlMapStateBenchmark.mapUpdate HEAP Expire3PercentPerIteration NeverReturnExpired OnReadAndWrite 272.15 328.734 ops/ms
TtlMapStateBenchmark.mapUpdate HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnCreateAndWrite 274.306 358.366 ops/ms
TtlMapStateBenchmark.mapUpdate HEAP Expire3PercentPerIteration ReturnExpiredIfNotCleanedUp OnReadAndWrite 272.916 347.087 ops/ms
TtlMapStateBenchmark.mapUpdate HEAP NeverExpired NeverReturnExpired OnCreateAndWrite 264.072 346.595 ops/ms
TtlMapStateBenchmark.mapUpdate HEAP NeverExpired NeverReturnExpired OnReadAndWrite 273.377 341.417 ops/ms
TtlMapStateBenchmark.mapUpdate HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnCreateAndWrite 275.702 352.406 ops/ms
TtlMapStateBenchmark.mapUpdate HEAP NeverExpired ReturnExpiredIfNotCleanedUp OnReadAndWrite 275.58 357.394 ops/ms
        total: 32362.908 38118.387  

rovboyko avatar Mar 22 '24 04:03 rovboyko

I updated the PR and published the benchmark results. @azagrebin , @Myasuka , @Jiayi-Liao , could you please help with code review again?

rovboyko avatar Mar 23 '24 17:03 rovboyko