flink icon indicating copy to clipboard operation
flink copied to clipboard

[FLINK-36379] Improve (Global)Committer with UC disabled

Open AHeise opened this issue 1 year ago • 2 comments

What is the purpose of the change

FLINK-36287 disabled UC for all inter-sink connections to adhere to the contract of notifyCheckpointCompleted. This allows us to remove some special casing and improve global committer.

Brief change log

  • Refactor sink test assertions
  • Optimize global committers
  • Optimize committers with UC disabled

See commit messages for more details.

Verifying this change

Covered by tests in

  • org.apache.flink.streaming.runtime.operators.sink
  • org.apache.flink.streaming.api.connector.sink2
  • Various IT cases and E2E tests

I modified / extended the former two suites.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

AHeise avatar Oct 07 '24 14:10 AHeise

CI report:

  • 833fc938f1678f210c9aa0e33ddaf62e7c356c74 Azure: FAILURE
Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

flinkbot avatar Oct 07 '24 14:10 flinkbot

While I understand the idea of the change, I wonder if it the right time or way.

We just cut 2.0 and this is potentially a breaking change since we do not control where users append the global committer in their sink topologies. It's very theoretical and the globalcommitter was marked @Experimental but we should at least discuss it.

As discussed offline. The cut was just for 2.0-preview and there is going to be another cut for the actual 2.0.

I am also a bit torn of the change of operation in the GlobalCommitter and the usage of the underlying infra structure i.e. CheckpointCommittableManager. I probably need another pass to fully understand how well the manager plays with the new semantics of the GlobalCommitter.

It's much easier to reason about it if we disregard async retries (next PR fixes it): In the optimized way, the GlobalCommitter state contains only upstream committables iff it couldn't globally commit in a previous checkpoint yet. This can only happen if an upstream committer didn't receive notifyCheckpointCompleted. There are two reasons for that: lost RPC or race condition of the RPC with the next barrier (low checkpointing interval). If we assume RPC < next checkpoint and no lost RPC, all upstream committables arrive the global committer before the next checkpoint is triggerd. Then, the optimized approach will already globally commit.

The state naturally is also important when the global committer also performs 2PC because there is no upstream committer. Then, the behavior is virtually the same as now.

AHeise avatar Oct 18 '24 07:10 AHeise

Thanks for addressing the comments % please double check that org.apache.flink.table.planner.runtime.stream.sql.TableSinkITCase.testSinkDisorderChangeLogWithRank doesn't fail because of this change 👍

I double-checked: the test uses the SinkOperator wrapping the old SinkFunction and has no changed code.

AHeise avatar Oct 23 '24 06:10 AHeise

@flinkbot run azure

AHeise avatar Oct 23 '24 06:10 AHeise

@flinkbot run azure

AHeise avatar Oct 23 '24 06:10 AHeise

Failure is a known issue FLINK-36591.

AHeise avatar Oct 23 '24 10:10 AHeise