doris-flink-connector
doris-flink-connector copied to clipboard
[Bug] retry infinite 2pc
Search before asking
- [X] I had searched in the issues and found no similar issues.
Version
1.15 sink.max-retries = 1
What's Wrong?
retry infinite 2pc
2022-11-18 11:53:48,008 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65212 (019990f47bcb3edc3ef9a00232143186).
2022-11-18 11:53:48,008 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65212 019990f47bcb3edc3ef9a00232143186.
2022-11-18 11:53:49,013 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:49,013 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2), deploy into slot with allocation id e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:49,013 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from CREATED to DEPLOYING.
2022-11-18 11:53:49,014 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) [DEPLOYING].
2022-11-18 11:53:49,014 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3d7d4ae1
2022-11-18 11:53:49,014 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend
2022-11-18 11:53:49,014 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager'
2022-11-18 11:53:49,014 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from DEPLOYING to INITIALIZING.
2022-11-18 11:53:49,020 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'currentEmitEventTimeLag'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:49,020 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'sourceIdleTime'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:49,021 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2022-11-18 11:53:49,021 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2) switched from INITIALIZING to FAILED with failure cause: org.apache.doris.flink.exception.DorisRuntimeException: Commit failed {
"status": "Fail",
"msg": "errCode = 2, detailMessage = transaction [49734] not found"
}
at org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:116)
at org.apache.doris.flink.sink.committer.DorisCommitter.commit(DorisCommitter.java:71)
at org.apache.flink.streaming.api.transformations.SinkV1Adapter$CommitterAdapter.commit(SinkV1Adapter.java:282)
at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
2022-11-18 11:53:49,021 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 (0eedf3500794baa86ab2fdfcbefeb1c2).
2022-11-18 11:53:49,021 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65213 0eedf3500794baa86ab2fdfcbefeb1c2.
2022-11-18 11:53:50,025 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:50,026 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e), deploy into slot with allocation id e32293a602f95c9f67ad0f8e2c653365.
2022-11-18 11:53:50,026 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from CREATED to DEPLOYING.
2022-11-18 11:53:50,026 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) [DEPLOYING].
2022-11-18 11:53:50,026 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@3c9058a4
2022-11-18 11:53:50,026 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend
2022-11-18 11:53:50,026 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager'
2022-11-18 11:53:50,026 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from DEPLOYING to INITIALIZING.
2022-11-18 11:53:50,032 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'currentEmitEventTimeLag'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:50,032 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'sourceIdleTime'. Metric will not be reported.[localhost, taskmanager, localhost:33197-e443eb, insert-into_default_catalog.default_database.doris_purchase, Source: purchase[1], 0]
2022-11-18 11:53:50,033 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Closing Source Reader.
2022-11-18 11:53:50,033 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: purchase[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> doris_purchase[3]: Writer -> doris_purchase[3]: Committer (1/1)#65214 (f1346df276ea9b9c68b8907169fe5e3e) switched from INITIALIZING to FAILED with failure cause: org.apache.doris.flink.exception.DorisRuntimeException: Commit failed {
"status": "Fail",
"msg": "errCode = 2, detailMessage = transaction [49734] not found"
}
at org.apache.doris.flink.sink.committer.DorisCommitter.commitTransaction(DorisCommitter.java:116)
at org.apache.doris.flink.sink.committer.DorisCommitter.commit(DorisCommitter.java:71)
at org.apache.flink.streaming.api.transformations.SinkV1Adapter$CommitterAdapter.commit(SinkV1Adapter.java:282)
at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:127)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
What You Expected?
don't retry infinite 2pc which not found transaction in doris be.
How to Reproduce?
You can reproduce If you killed doris backend during sinking data through flink.
Anything Else?
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I had a similar problem and I wanted to not keep retrying after a failed write. And I set the maximum number of retries does not take effect.
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setMaxRetries(2);
DorisSink.sink(
DorisReadOptions.builder().build(),
DorisExecutionOptions.builder()
//.setBatchSize(1)
//.setBatchIntervalMs(0L)
.setMaxRetries(1)
.setStreamLoadProp(properties)
.build(),
DorisOptions.builder()
.setFenodes(feNodes)
.setTableIdentifier(tableIdentifier)
.setUsername(username)
.setPassword(password)
.build()
);
Hi, we ran into the same issue. Anything new?