flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-35029][state/forst] Store timer in JVM heap when use async state backend

Open fredia opened this issue 1 year ago • 2 comments

What is the purpose of the change

This PR supports storing timer in JVM heap when use async state backend. Note: Timers are stored as raw keyed state instead of managed keyed state now, the snapshot/restore of jvm timer is managed by InternalTimeServiceManager.

Brief change log

  • Implement ForStKeyedStateBackend#create() -> KeyGroupedInternalPriorityQueue
  • Support using async keyed state backend initializing timeServiceManager in StreamTaskStateInitializerImpl

Verifying this change

This change added tests and can be verified as follows:

  • StateBackendTestV2Base#testKeyGroupedInternalPriorityQueue

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

fredia avatar Oct 12 '24 04:10 fredia