seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

msyql-cdc To Drois

Open tracyliuzw opened this issue 1 year ago • 4 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

During the test, it was found that an abnormal situation occurred at the last moment of synchronizing historical data of mysql-cdc, which should be the crash or loss of historical data during the last synchronization.

SeaTunnel Version

2.3.3

SeaTunnel Config

env {
  # You can set flink configuration here
  execution.parallelism = 10
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

source {
	MySQL-CDC {
		incremental.parallelism = 10
		server-id = 6002
		username = "doris"
		password = "**************"
		database-names = ["ad_stat_data"]
		table-names = ["ad_stat_data.t_presentee_stat_world_112001"]
		base-url = "jdbc:mysql://10.246.50.113:3310/ad_stat_data"
		startup.mode = "initial"
		snapshot.split.size = 8096
		snapshot.fetch.size = 5000
  }
}

sink {
    Doris {
		fenodes = "10.246.98.111:8030"
		username = root
		password = "Bigdata@igg123456"
		table.identifier = "ad_stat_data.t_presentee_stat_world_112001"
		sink.enable-2pc = "true"
		sink.label-prefix = "json"
		sink.enable-delete = true
		doris.config = {
			format="json"
			read_json_by_line="true"
			merge_type = "MERGE"
			delete = "__DORIS_DELETE_SIGN__=1"
			enable_profile = "true"
		}
    }
}

Running Command

/usr/local/seatunnel/bin/seatunnel.sh -cn seatunnel-igg -n 112001Tokafka -c 112001Tokafka.conf  --async true

Error Exception

2023-10-16 03:22:27,417 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error, 

2023-10-16 03:22:27,417 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues

2023-10-16 03:22:27,417 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed 

2023-10-16 03:22:27,418 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:190)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.NullPointerException
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.requestSplit(SourceFlowLifeCycle.java:223)
	at org.apache.seatunnel.engine.server.task.context.SourceReaderContext.sendSplitRequest(SourceReaderContext.java:64)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.onSplitFinished(IncrementalSourceReader.java:140)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.finishCurrentFetch(SourceReaderBase.java:204)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.moveToNextSplit(SourceReaderBase.java:180)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:161)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:92)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:98)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:112)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:98)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:105)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:167)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:110)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:611)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
	at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:121)
	at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolveAndThrowIfException(InvocationFuture.java:100)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:617)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.requestSplit(SourceFlowLifeCycle.java:220)
	... 19 more
Caused by: java.lang.NullPointerException
	at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner.lambda$createIncrementalSplit$4(IncrementalSplitAssigner.java:203)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
	at java.util.HashMap$ValueSpliterator.forEachRemaining(HashMap.java:1652)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
	at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner.createIncrementalSplit(IncrementalSplitAssigner.java:205)
	at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner.createIncrementalSplits(IncrementalSplitAssigner.java:193)
	at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSplitAssigner.getNext(IncrementalSplitAssigner.java:102)
	at org.apache.seatunnel.connectors.cdc.base.source.enumerator.HybridSplitAssigner.getNext(HybridSplitAssigner.java:105)
	at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.assignSplits(IncrementalSourceEnumerator.java:160)
	at org.apache.seatunnel.connectors.cdc.base.source.enumerator.IncrementalSourceEnumerator.handleSplitRequest(IncrementalSourceEnumerator.java:81)
	at org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask.requestSplit(SourceSplitEnumeratorTask.java:231)
	at org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation.lambda$run$0(RequestSplitOperation.java:62)
	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
	at org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation.run(RequestSplitOperation.java:52)
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:471)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:197)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:137)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
	at ------ submitted from ------.()
	at com.hazelcast.internal.util.ExceptionUtil.cloneExceptionWithFixedAsyncStackTrace(ExceptionUtil.java:336)
	at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.returnOrThrowWithGetConventions(InvocationFuture.java:112)
	... 22 more

	at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:183)
	... 2 more

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

tracyliuzw avatar Oct 16 '23 08:10 tracyliuzw