doris-flink-connector icon indicating copy to clipboard operation
doris-flink-connector copied to clipboard

[fix][cdc] fix uid conflicts during multi-database synchronization.

Open vinlee19 opened this issue 9 months ago • 2 comments

Proposed changes

When dealing with multiple databases in MySQL, it's common to have tables with identical schemas spread across different databases. If you try to synchronize these databases to Doris using the provided shell script,

bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.17-1.6.0.jar \
    mysql-sync-database  \
    --mysql-conf hostname=127.0.0.1 \
    --mysql-conf port=3306 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name="multi.*" \
    --sink-conf fenodes=127.0.0.1:8030 \
    --sink-conf username=root \
    --sink-conf password=123456 \
    --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
    --sink-conf sink.label-prefix=label \
    --table-conf replication_num=1 

you might encounter the following error:

rg.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Hash collision on user-specified ID "customer_1". Most likely cause is a non-unique ID. Please check that all IDs specified via `uid(String)` are unique.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.IllegalArgumentException: Hash collision on user-specified ID "customer_1". Most likely cause is a non-unique ID. Please check that all IDs specified via `uid(String)` are unique.
        at org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:185)
        at org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:110)
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:246)
        at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:160)
        at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1024)
        at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56)
        at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
        at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61)
        at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2197)
        at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189)
        at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:118)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)
        at org.apache.doris.flink.tools.cdc.CdcTools.syncDatabase(CdcTools.java:151)
        at org.apache.doris.flink.tools.cdc.CdcTools.createMySQLSyncDatabase(CdcTools.java:74)
        at org.apache.doris.flink.tools.cdc.CdcTools.main(CdcTools.java:51)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)

This PR aims to address this issue.

Issue Number: close #xxx

Problem Summary:

Describe the overview of changes.

Checklist(Required)

  1. Does it affect the original behavior: (Yes/No/I Don't know)
  2. Has unit tests been added: (Yes/No/No Need)
  3. Has document been added or modified: (Yes/No/No Need)
  4. Does it need to update dependencies: (Yes/No)
  5. Are there any changes that cannot be rolled back: (Yes/No)

Further comments

If this is a relatively large or complex change, kick off the discussion at [email protected] by explaining why you chose the solution you did and what alternatives you considered, etc...

vinlee19 avatar May 08 '24 08:05 vinlee19

This may will result in the inability to recover from previous checkpoints.

JNSimba avatar May 08 '24 09:05 JNSimba

This may will result in the inability to recover from previous checkpoints. Thanks, I've already solved it

vinlee19 avatar May 08 '24 09:05 vinlee19