flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[Bug] Mysql is synchronized to doris, and an error is reported when adding or deleting data after dropping column.

Open didiaode18 opened this issue 1 year ago • 2 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Flink version

1.18.1

Flink CDC version

3.0.0

Database and its version

mysql :5.6.24 doris :doris-2.0.0-alpha1

Minimal reproduce step

Data can be monitored normally and can be inserted, deleted, etc. normally, but once the data is re-inserted after deleting the field, the program will exit with an error. image

image The error message is as follows: 2024-02-06 16:05:39 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276) at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269) at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: java.lang.IllegalStateException: Column size does not match the data size at com.ververica.cdc.common.utils.Preconditions.checkState(Preconditions.java:160) at com.ververica.cdc.connectors.doris.sink.DorisEventSerializer.serializerRecord(DorisEventSerializer.java:119) at com.ververica.cdc.connectors.doris.sink.DorisEventSerializer.applyDataChangeEvent(DorisEventSerializer.java:98) at com.ververica.cdc.connectors.doris.sink.DorisEventSerializer.serialize(DorisEventSerializer.java:69) at com.ververica.cdc.connectors.doris.sink.DorisEventSerializer.serialize(DorisEventSerializer.java:47) at org.apache.doris.flink.sink.batch.DorisBatchWriter.write(DorisBatchWriter.java:96) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161) at com.ververica.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:154) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:834)

What did you expect to see?

After deleting the field and reinserting the data, the program will not report an error.

What did you see instead?

flink program failed

Anything else?

No response

Are you willing to submit a PR?

  • [X] I'm willing to submit a PR!

didiaode18 avatar Feb 07 '24 08:02 didiaode18

Please assign this task to me

didiaode18 avatar Feb 08 '24 00:02 didiaode18

Please assign this task to me @leonardBang

didiaode18 avatar Feb 21 '24 03:02 didiaode18

Do the log messages contain any errors related to DorisRuntimeException? If so, you might need to ask for help from Apache Doris community.

vinlee19 avatar Mar 09 '24 14:03 vinlee19

Closing this issue as it has been migrated to Apache Jira.

PatrickRen avatar Apr 09 '24 05:04 PatrickRen