[FLINK-36379] Improve (Global)Committer with UC disabled
What is the purpose of the change
FLINK-36287 disabled UC for all inter-sink connections to adhere to the contract of notifyCheckpointCompleted. This allows us to remove some special casing and improve global committer.
Brief change log
- Refactor sink test assertions
- Optimize global committers
- Optimize committers with UC disabled
See commit messages for more details.
Verifying this change
Covered by tests in
- org.apache.flink.streaming.runtime.operators.sink
- org.apache.flink.streaming.api.connector.sink2
- Various IT cases and E2E tests
I modified / extended the former two suites.
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / no)
- The public API, i.e., is any changed class annotated with
@Public(Evolving): (yes / no) - The serializers: (yes / no / don't know)
- The runtime per-record code paths (performance sensitive): (yes / no / don't know)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
- The S3 file system connector: (yes / no / don't know)
Documentation
- Does this pull request introduce a new feature? (yes / no)
- If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
CI report:
- 833fc938f1678f210c9aa0e33ddaf62e7c356c74 Azure: FAILURE
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
While I understand the idea of the change, I wonder if it the right time or way.
We just cut 2.0 and this is potentially a breaking change since we do not control where users append the global committer in their sink topologies. It's very theoretical and the globalcommitter was marked
@Experimentalbut we should at least discuss it.
As discussed offline. The cut was just for 2.0-preview and there is going to be another cut for the actual 2.0.
I am also a bit torn of the change of operation in the
GlobalCommitterand the usage of the underlying infra structure i.e.CheckpointCommittableManager. I probably need another pass to fully understand how well the manager plays with the new semantics of theGlobalCommitter.
It's much easier to reason about it if we disregard async retries (next PR fixes it): In the optimized way, the GlobalCommitter state contains only upstream committables iff it couldn't globally commit in a previous checkpoint yet. This can only happen if an upstream committer didn't receive notifyCheckpointCompleted. There are two reasons for that: lost RPC or race condition of the RPC with the next barrier (low checkpointing interval). If we assume RPC < next checkpoint and no lost RPC, all upstream committables arrive the global committer before the next checkpoint is triggerd. Then, the optimized approach will already globally commit.
The state naturally is also important when the global committer also performs 2PC because there is no upstream committer. Then, the behavior is virtually the same as now.
Thanks for addressing the comments % please double check that
org.apache.flink.table.planner.runtime.stream.sql.TableSinkITCase.testSinkDisorderChangeLogWithRankdoesn't fail because of this change 👍
I double-checked: the test uses the SinkOperator wrapping the old SinkFunction and has no changed code.
@flinkbot run azure
@flinkbot run azure
Failure is a known issue FLINK-36591.