flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-28861][table] Make UID generation behavior configurable and plan-only by default

Open twalthr opened this issue 3 years ago • 1 comments

What is the purpose of the change

This PR fixes two critical bugs in the stream operator UID generation of Table API.

  1. Not deterministic UID: Due to changes for FLIP-190, every operator generated by the planner gets a UID assigned in master. However, the UID is based on a static counter that might return different results depending on the environment. Thus, UIDs are not deterministic and make stateful restores impossible e.g. when going from 1.15.0 -> 1.15.1.

This PR restores the old pre-1.15 behavior for regular Table API. It only adds UIDs if the operator has been created from a compiled plan. A compiled plan makes the UIDs static and thus deterministic.

  1. UID format limits migrations: The current UID is not future-proof for migrations. The ExecNode version should not be in the UID, otherwise, operator migration won't be possible once plan migration is executed. See the FLIP-190 example that drops a version in the plan, once operator migration has been performed. Given that the plan feature is marked as @Experimental, this change should still be possible without providing backwards compatibility. The second commit will not be backported to 1.15 as it would make upgrades within a patch version impossible. Breaking the compatibility across minor versions should be fine as we don't provide state compatibility for SQL yet. A new config option allows to restore the UID format of 1.15 and allows for more flexibility around UIDs in general.

Brief change log

  • Introduce config options for when to add UID and how the format looks like
  • Propagate information through the planner whether compilation is the source of the translation

Verifying this change

This change added tests and can be verified as follows:

  • TransformationsTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? JavaDocs

twalthr avatar Aug 11 '22 13:08 twalthr

CI report:

  • f8d74475ca90c19fc8c9c62df2074f4ddf253b2a Azure: SUCCESS
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Aug 11 '22 13:08 flinkbot

I found a nicer approach and will push an update soon.

twalthr avatar Aug 12 '22 06:08 twalthr

Thanks for the review @zentol. Next to the necessity to pass around the isCompile flag, I found another issue due to the fact that transformations can be caches in a transient variable. In the end, I found a way nicer solution by just moving the flag a call higher in the planning phase. The code looks much cleaner now, I also added an additional test. Please have another look.

twalthr avatar Aug 12 '22 10:08 twalthr