flink-cdc
flink-cdc copied to clipboard
[FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink
Current , when sink is not instanceof TwoPhaseCommittingSink, use input.transform rather than stream. It means that pre-write topology will be ignored.
private void sinkTo(
DataStream<Event> input,
Sink<Event> sink,
String sinkName,
OperatorID schemaOperatorID) {
DataStream<Event> stream = input;
// Pre write topology
if (sink instanceof WithPreWriteTopology) {
stream = ((WithPreWriteTopology<Event>) sink).addPreWriteTopology(stream);
}
if (sink instanceof TwoPhaseCommittingSink) {
addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
} else {
input.transform(
SINK_WRITER_PREFIX + sinkName,
CommittableMessageTypeInfo.noOutput(),
new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
}
}
(ps: the modify of StarRocksUtils just apply spotless)
@PatrickRen , CC
Good catch @loserwang1024!
Could you please add a test case to prevent later code changes to revert this fix?
Could you please add a test case to prevent later code changes to revert this fix?
I'd like to, but it seems no pipeline sink which is WithPreWriteTopology but not TwoPhaseCommittingSink now unless i mock one.
Could you please add a test case to prevent later code changes to revert this fix?
I'd like to, but it seems no pipeline sink which is WithPreWriteTopology but not TwoPhaseCommittingSink now unless i mock one.
Could you define your own one in the test itself? Then you have free hands what it does, and does not...
Could you define your own one in the test itself? Then you have free hands what it does, and does not...
Done it.
@PatrickRen , CC, Would you like to help me review this PR?
Hi @loserwang1024, could you please backport this patch to release-3.1
branch so that it could be released with CDC 3.1.1?
Hi @loserwang1024, could you please backport this patch to
release-3.1
branch so that it could be released with CDC 3.1.1?
I'd like to, but it haven't been merged to master.
@loserwang1024 Could you cherry-pick this commit to release-3.1 branch? Thanks