doris-flink-connector icon indicating copy to clipboard operation
doris-flink-connector copied to clipboard

[Bug] retry infinite 2pc

Open rafael81 opened this issue 2 years ago • 3 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

Version

1.15 sink.max-retries = 1

What's Wrong?

retry infinite 2pc


2022-11-18 11:53:48,008 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65212 (019990f47bcb3edc3ef9a00232143186).
2022-11-18 11:53:48,008 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65212 019990f47bcb3edc3ef9a00232143186.
2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2), deploy into slot with allocation id e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:49,013 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from CREATED to DEPLOYING.
2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) [DEPLOYING].
2022-11-18 11:53:49,014 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3d7d4ae1
2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
2022-11-18 11:53:49,014 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage is set to 'jobmanager'
2022-11-18 11:53:49,014 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from DEPLOYING to INITIALIZING.
2022-11-18 11:53:49,020 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'currentEmitEventTimeLag'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:49,020 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'sourceIdleTime'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:49,021 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2022-11-18 11:53:49,021 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from INITIALIZING to FAILED with failure cause: org.apache.doris.flink.exception.DorisRuntimeException: Commit failed {
    "status": "Fail",
    "msg": "errCode = 2, detailMessage = transaction [49734] not found"
}
	at org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:116)
	at org.apache.doris.flink.sink.committer.DorisCommitter.commit(DorisCommitter.java:71)
	at org.apache.flink.streaming.api.transformations.SinkV1Adapter$CommitterAdapter.commit(SinkV1Adapter.java:282)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.lang.Thread.run(Thread.java:748)

2022-11-18 11:53:49,021 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2).
2022-11-18 11:53:49,021 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 0eedf3500794baa86ab2fdfcbefeb1c2.
2022-11-18 11:53:50,025 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e), deploy into slot with allocation id e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from CREATED to DEPLOYING.
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) [DEPLOYING].
2022-11-18 11:53:50,026 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3c9058a4
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
2022-11-18 11:53:50,026 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage is set to 'jobmanager'
2022-11-18 11:53:50,026 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from DEPLOYING to INITIALIZING.
2022-11-18 11:53:50,032 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'currentEmitEventTimeLag'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:50,032 WARN  org.apache.flink.metrics.MetricGroup                         [] - Name collision: Group already contains a Metric with the name 'sourceIdleTime'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:50,033 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2022-11-18 11:53:50,033 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from INITIALIZING to FAILED with failure cause: org.apache.doris.flink.exception.DorisRuntimeException: Commit failed {
    "status": "Fail",
    "msg": "errCode = 2, detailMessage = transaction [49734] not found"
}
	at org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:116)
	at org.apache.doris.flink.sink.committer.DorisCommitter.commit(DorisCommitter.java:71)
	at org.apache.flink.streaming.api.transformations.SinkV1Adapter$CommitterAdapter.commit(SinkV1Adapter.java:282)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
	at java.lang.Thread.run(Thread.java:748)

What You Expected?

don't retry infinite 2pc which not found transaction in doris be.

How to Reproduce?

You can reproduce If you killed doris backend during sinking data through flink.

Anything Else?

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

rafael81 avatar Nov 18 '22 03:11 rafael81

I had a similar problem and I wanted to not keep retrying after a failed write. And I set the maximum number of retries does not take effect.

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setMaxRetries(2);
DorisSink.sink(

                DorisReadOptions.builder().build(),
                DorisExecutionOptions.builder()
                        //.setBatchSize(1)
                        //.setBatchIntervalMs(0L)
                        .setMaxRetries(1)
                        .setStreamLoadProp(properties)
                        .build(),
                DorisOptions.builder()
                        .setFenodes(feNodes)
                        .setTableIdentifier(tableIdentifier)
                        .setUsername(username)
                        .setPassword(password)
                        .build()
        );

doubly-yi avatar Dec 14 '22 04:12 doubly-yi

Hi, we ran into the same issue. Anything new?

link3280 avatar Apr 28 '23 06:04 link3280