seatunnel
seatunnel copied to clipboard
[Bug] [Zeta] Aggregate commit error
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
When I try to do JDBC source(Oracle, PostgreSQL) to SFTP sink(parquet type), I got this error.
While data source and sink are in progress, we try to save checkpoints at regular intervals, but the pending and wait states are repeated. It appears that an error occurred during the final aggregation process. This happens equally in both cluster mode and local mode.
root@ubuntu-2004:/tmp/seatunnel/seatunnel/799451662937751553/09396c8f64/T_799451662937751553_09396c8f64_0_1/NON_PARTITION# ls -alh
total 1.8G
drwxrwxr-x 2 ink ink 4.0K Jan 16 10:38 .
drwxrwxr-x 3 ink ink 4.0K Jan 16 10:35 ..
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_0.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_1.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_2.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_3.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_4.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_5.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_6.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:40 T_799451662937751553_09396c8f64_0_1_7.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:40 T_799451662937751553_09396c8f64_0_1_8.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:40 T_799451662937751553_09396c8f64_0_1_9.parquet
SeaTunnel Version
SeaTunnel 2.3.3
SeaTunnel Config
# /usr/local/seatunnel/config/seatunnel.yaml
seatunnel:
engine:
history-job-expire-minutes: 1440
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
print-job-metrics-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
interval: 10000
timeout: 600000
storage:
type: hdfs
max-retained: 3
plugin-config:
namespace: /dev/
storage.type: hdfs
fs.defaultFS: hdfs://namenode:8020 # Ensure that the directory has written permission
# /usr/local/seatunnel/config/v2.batch.config.jdbc-oracle-sftp-parquet
env {
# You can set SeaTunnel environment configuration here
execution.parallelism = 3
job.mode = "BATCH"
checkpoint.interval = 10000
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Jdbc {
url = "jdbc:oracle:thin:@oracle-server:19070:XE"
driver = "oracle.jdbc.OracleDriver"
user = "user"
password = "StrongP@ssword"
query = "select * from USER.TEST_ITEM"
result_table_name = "TEST_ITEM"
}
}
sink {
SftpFile {
path = "/tmp/sink/oracle2parquet"
host = "sftp-host"
port = 22
user = ink
password = "StrongP@ssword"
file_format_type = "parquet"
source_table_name = "TEST_ITEM"
}
Running Command
/usr/local/seatunnel/bin/seatunnel.sh --config /usr/local/seatunnel/config/v2.batch.config.jdbc-oracle-sftp-parquet
Error Exception
Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Checkpoint notify complete failed
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:255)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:251)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.notifyCompleted(CheckpointCoordinator.java:328)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:736)
at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$7(CheckpointCoordinator.java:480)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Aggregate commit error.
at org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask.notifyCheckpointComplete(SinkAggregatedCommitterTask.java:300)
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1300)
at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1284)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243)
at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234)
at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:230)
at com.hazelcast.spi.impl.operationservice.Operation.sendResponse(Operation.java:483)
Zeta or Flink or Spark Version
Zeta Cluster Mode (2 instances)
Java or Scala Version
OpenJDK Runtime Environment (build 1.8.0_392-8u392-ga-1~20.04-b08)
Screenshots
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I can't tell the reason of agg commiter failure from the logs. Are there any more logs?
From the job config, I found you do not config the partition_column
and partition_num
. So the source will read all data as a single split. If the split can not read complete in 600000
ms, The checkpoint will timeout.
@EricJoy2048
Thanks for replying, I tried configuring the JDBC source as you said, but it didn't work. However, when I looked up your reply to another issue in the past, I saw an article saying that tmp_path needs to be set in the SFTP sink settings, so I set it up and it worked out well. #5797
Thank you again
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.
This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.