flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[FLINK-35149][cdc-composer] Fix DataSinkTranslator#sinkTo ignoring pre-write topology if not TwoPhaseCommittingSink

Open loserwang1024 opened this issue 10 months ago • 6 comments

Current , when sink is not instanceof TwoPhaseCommittingSink, use input.transform rather than stream. It means that pre-write topology will be ignored.

private void sinkTo(
        DataStream<Event> input,
        Sink<Event> sink,
        String sinkName,
        OperatorID schemaOperatorID) {
    DataStream<Event> stream = input;
    // Pre write topology
    if (sink instanceof WithPreWriteTopology) {
        stream = ((WithPreWriteTopology<Event>) sink).addPreWriteTopology(stream);
    }

    if (sink instanceof TwoPhaseCommittingSink) {
        addCommittingTopology(sink, stream, sinkName, schemaOperatorID);
    } else {
        input.transform(
                SINK_WRITER_PREFIX + sinkName,
                CommittableMessageTypeInfo.noOutput(),
                new DataSinkWriterOperatorFactory<>(sink, schemaOperatorID));
    }
} 

(ps: the modify of StarRocksUtils just apply spotless)

loserwang1024 avatar Apr 18 '24 02:04 loserwang1024

@PatrickRen , CC

loserwang1024 avatar Apr 18 '24 02:04 loserwang1024

Good catch @loserwang1024!

Could you please add a test case to prevent later code changes to revert this fix?

pvary avatar Apr 18 '24 19:04 pvary

Could you please add a test case to prevent later code changes to revert this fix?

I'd like to, but it seems no pipeline sink which is WithPreWriteTopology but not TwoPhaseCommittingSink now unless i mock one.

loserwang1024 avatar May 02 '24 06:05 loserwang1024

Could you please add a test case to prevent later code changes to revert this fix?

I'd like to, but it seems no pipeline sink which is WithPreWriteTopology but not TwoPhaseCommittingSink now unless i mock one.

Could you define your own one in the test itself? Then you have free hands what it does, and does not...

pvary avatar May 02 '24 15:05 pvary

Could you define your own one in the test itself? Then you have free hands what it does, and does not...

Done it.

loserwang1024 avatar May 08 '24 08:05 loserwang1024

@PatrickRen , CC, Would you like to help me review this PR?

loserwang1024 avatar May 13 '24 06:05 loserwang1024

Hi @loserwang1024, could you please backport this patch to release-3.1 branch so that it could be released with CDC 3.1.1?

yuxiqian avatar May 29 '24 06:05 yuxiqian

Hi @loserwang1024, could you please backport this patch to release-3.1 branch so that it could be released with CDC 3.1.1?

I'd like to, but it haven't been merged to master.

loserwang1024 avatar May 30 '24 01:05 loserwang1024

@loserwang1024 Could you cherry-pick this commit to release-3.1 branch? Thanks

PatrickRen avatar Jun 04 '24 08:06 PatrickRen