seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [Zeta] Aggregate commit error

Open inkkim opened this issue 1 year ago • 4 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

When I try to do JDBC source(Oracle, PostgreSQL) to SFTP sink(parquet type), I got this error.

While data source and sink are in progress, we try to save checkpoints at regular intervals, but the pending and wait states are repeated. It appears that an error occurred during the final aggregation process. This happens equally in both cluster mode and local mode.

root@ubuntu-2004:/tmp/seatunnel/seatunnel/799451662937751553/09396c8f64/T_799451662937751553_09396c8f64_0_1/NON_PARTITION# ls -alh
total 1.8G
drwxrwxr-x 2 ink ink 4.0K Jan 16 10:38 .
drwxrwxr-x 3 ink ink 4.0K Jan 16 10:35 ..
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_0.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_1.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_2.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_3.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_4.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_5.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:39 T_799451662937751553_09396c8f64_0_1_6.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:40 T_799451662937751553_09396c8f64_0_1_7.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:40 T_799451662937751553_09396c8f64_0_1_8.parquet
-rw-rw-r-- 1 ink ink 184M Jan 16 10:40 T_799451662937751553_09396c8f64_0_1_9.parquet

SeaTunnel Version

SeaTunnel 2.3.3

SeaTunnel Config

# /usr/local/seatunnel/config/seatunnel.yaml
seatunnel:
  engine:
    history-job-expire-minutes: 1440
    backup-count: 1
    queue-type: blockingqueue
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 60
    slot-service:
      dynamic-slot: true
    checkpoint:
      interval: 10000
      timeout: 600000
      storage:
        type: hdfs
        max-retained: 3
        plugin-config:
          namespace: /dev/
          storage.type: hdfs
          fs.defaultFS: hdfs://namenode:8020 # Ensure that the directory has written permission


# /usr/local/seatunnel/config/v2.batch.config.jdbc-oracle-sftp-parquet

env {
  # You can set SeaTunnel environment configuration here
  execution.parallelism = 3
  job.mode = "BATCH"
  checkpoint.interval = 10000
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  Jdbc {
    url = "jdbc:oracle:thin:@oracle-server:19070:XE"
    driver = "oracle.jdbc.OracleDriver"
    user = "user"
    password = "StrongP@ssword"
    query = "select * from USER.TEST_ITEM"
    result_table_name = "TEST_ITEM"
    }
  }

sink {
    SftpFile {
    path = "/tmp/sink/oracle2parquet"
    host = "sftp-host"
    port = 22
    user = ink
    password = "StrongP@ssword"
    file_format_type = "parquet"
    source_table_name = "TEST_ITEM"
  }

Running Command

/usr/local/seatunnel/bin/seatunnel.sh --config /usr/local/seatunnel/config/v2.batch.config.jdbc-oracle-sftp-parquet

Error Exception

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Checkpoint notify complete failed
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:255)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:251)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.notifyCompleted(CheckpointCoordinator.java:328)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:736)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$7(CheckpointCoordinator.java:480)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Aggregate commit error.
	at org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask.notifyCheckpointComplete(SinkAggregatedCommitterTask.java:300)
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:175)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:139)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun(OperationThread.java:123)
	at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)

	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at java.util.concurrent.CompletableFuture.biRelay(CompletableFuture.java:1300)
	at java.util.concurrent.CompletableFuture$BiRelay.tryFire(CompletableFuture.java:1284)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.onComplete(AbstractInvocationFuture.java:1243)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1234)
	at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.completeExceptionally(Invocation.java:680)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyThrowable(Invocation.java:386)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:330)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:230)
	at com.hazelcast.spi.impl.operationservice.Operation.sendResponse(Operation.java:483)

Zeta or Flink or Spark Version

Zeta Cluster Mode (2 instances)

Java or Scala Version

OpenJDK Runtime Environment (build 1.8.0_392-8u392-ga-1~20.04-b08)

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

inkkim avatar Jan 17 '24 02:01 inkkim

I can't tell the reason of agg commiter failure from the logs. Are there any more logs?

EricJoy2048 avatar Jan 19 '24 07:01 EricJoy2048

From the job config, I found you do not config the partition_column and partition_num. So the source will read all data as a single split. If the split can not read complete in 600000 ms, The checkpoint will timeout.

EricJoy2048 avatar Jan 19 '24 07:01 EricJoy2048

@EricJoy2048

Thanks for replying, I tried configuring the JDBC source as you said, but it didn't work. However, when I looked up your reply to another issue in the past, I saw an article saying that tmp_path needs to be set in the SFTP sink settings, so I set it up and it worked out well. #5797

Thank you again

inkkim avatar Jan 23 '24 06:01 inkkim

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] avatar Feb 23 '24 00:02 github-actions[bot]

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.

github-actions[bot] avatar Mar 14 '24 00:03 github-actions[bot]