paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[Bug] The `paimon-s3-1.0.1.jar` and the flink plugins `flink-s3-fs-hadoop-1.20.1.jar` conflict with each other, resulting in the shutdown of the S3 connection pool.

Open yl-yue opened this issue 5 months ago • 1 comments

Search before asking

  • [x] I searched in the issues and found nothing similar.

Paimon version

1.0.1

Compute Engine

flink 1.20.1

Minimal reproduce step

The official documentation states that with flink-s3-fs-hadoop-1.20.1.jar, one can do without paimon-s3-1.0.1.jar. However, if the paimon-s3-1.0.1.jar is not present in the Flink cluster, submitting the job will result in the following exception:

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.paimon.s3.S3Loader$S3PluginFileIO
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:414)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:169)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:789)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.paimon.s3.S3Loader$S3PluginFileIO
	at java.base/java.net.URLClassLoader.findClass(Unknown Source)
	at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
	at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
	at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
	at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:197)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Unknown Source)
	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
	at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(Unknown Source)
	at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(Unknown Source)
	at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(Unknown Source)
	at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:472)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:467)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:422)
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:400)
	... 9 more

If both of these two jars are present at the same time, a conflict in connection pool management will occur, resulting in the shutdown of the S3 Connection pool. The error message is as follows:

2025-07-18 02:26:12,044 INFO  org.apache.hadoop.fs.s3a.S3AInputStream                      [] - Switching to Random IO seek policy
2025-07-18 02:26:12,044 INFO  org.apache.hadoop.fs.s3a.S3AInputStream                      [] - Switching to Random IO seek policy
2025-07-18 02:26:12,064 INFO  org.apache.hadoop.fs.s3a.S3AInputStream                      [] - Switching to Random IO seek policy
2025-07-18 02:26:12,065 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 130 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1752805572006 for job 691f85ebe6821da0d5166daad0bf3662.
2025-07-18 02:26:12,074 INFO  org.apache.hadoop.fs.s3a.S3AInputStream                      [] - Switching to Random IO seek policy
2025-07-18 02:26:12,079 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline checkpoint 17 by task 8e8063ce0236468f4e6671bbf59a1de5_335e22609e0565c9df076a23c65241e4_0_15 of job 37decc981c91413e0323f5b6f0fc86e7 at 10.100.4.199:46485-809a61 @ 10-100-4-199.taskmanager.bd-flink.svc.cluster.local (dataPort=40097).
org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed.
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70) ~[flink-dist-1.20.1.jar:1.20.1]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
	at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not materialize checkpoint 17 for operator Join[467] -> Calc[468] -> ConstraintEnforcer[469] -> Map (1/1)#15.
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) ~[flink-dist-1.20.1.jar:1.20.1]
	... 5 more
Caused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.io.IOException: Could not open output stream for state backend
	at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?]
	at java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?]
	at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.20.1.jar:1.20.1]
	... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Could not open output stream for state backend
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:462) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:308) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:284) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.util.ForwardingOutputStream.write(ForwardingOutputStream.java:52) ~[flink-dist-1.20.1.jar:1.20.1]
	at java.io.DataOutputStream.write(Unknown Source) ~[?:?]
	at org.apache.flink.table.data.binary.BinarySegmentUtils.copyToView(BinarySegmentUtils.java:222) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.serialize(BinaryRowDataSerializer.java:90) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:146) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) ~[flink-dist-1.20.1.jar:1.20.1]
	at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
	at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.20.1.jar:1.20.1]
	... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.IllegalStateException: Connection pool shut down
	at org.apache.http.util.Asserts.check(Asserts.java:34) ~[?:?]
	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) ~[?:?]
	at jdk.internal.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
	at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) ~[?:?]
	at com.amazonaws.http.conn.$Proxy40.requestConnection(Unknown Source) ~[?:?]
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) ~[?:?]
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[?:?]
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[?:?]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[?:?]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) ~[?:?]
	at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1378) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1183) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593) ~[?:?]
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540) ~[?:?]
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1422) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545) ~[?:?]
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerCreateFile(S3AFileSystem.java:1682) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$create$6(S3AFileSystem.java:1646) ~[?:?]
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499) ~[?:?]
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1645) ~[?:?]
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
	at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:154) ~[?:?]
	at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37) ~[?:?]
	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:130) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:76) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:452) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:308) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:284) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.util.ForwardingOutputStream.write(ForwardingOutputStream.java:52) ~[flink-dist-1.20.1.jar:1.20.1]
	at java.io.DataOutputStream.write(Unknown Source) ~[?:?]
	at org.apache.flink.table.data.binary.BinarySegmentUtils.copyToView(BinarySegmentUtils.java:222) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.serialize(BinaryRowDataSerializer.java:90) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:146) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) ~[flink-dist-1.20.1.jar:1.20.1]
	at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
	at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.20.1.jar:1.20.1]
	... 4 more
2025-07-18 02:26:12,080 WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 17 for job 37decc981c91413e0323f5b6f0fc86e7. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed.
	at org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1145) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$3(ExecutionGraphHandler.java:123) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70) ~[flink-dist-1.20.1.jar:1.20.1]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
	at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed.
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.20.1.jar:1.20.1]
	... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not materialize checkpoint 17 for operator Join[467] -> Calc[468] -> ConstraintEnforcer[469] -> Map (1/1)#15.
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.20.1.jar:1.20.1]
	... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.io.IOException: Could not open output stream for state backend
	at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?]
	at java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?]
	at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.20.1.jar:1.20.1]
	... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: java.io.IOException: Could not open output stream for state backend
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:462) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:308) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:284) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.util.ForwardingOutputStream.write(ForwardingOutputStream.java:52) ~[flink-dist-1.20.1.jar:1.20.1]
	at java.io.DataOutputStream.write(Unknown Source) ~[?:?]
	at org.apache.flink.table.data.binary.BinarySegmentUtils.copyToView(BinarySegmentUtils.java:222) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.serialize(BinaryRowDataSerializer.java:90) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:146) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) ~[flink-dist-1.20.1.jar:1.20.1]
	at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
	at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.20.1.jar:1.20.1]
	... 4 more
Caused by: org.apache.flink.util.SerializedThrowable: java.lang.IllegalStateException: Connection pool shut down
	at org.apache.http.util.Asserts.check(Asserts.java:34) ~[?:?]
	at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269) ~[?:?]
	at jdk.internal.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
	at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) ~[?:?]
	at com.amazonaws.http.conn.$Proxy40.requestConnection(Unknown Source) ~[?:?]
	at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) ~[?:?]
	at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[?:?]
	at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[?:?]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[?:?]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) ~[?:?]
	at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1378) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1183) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:838) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:805) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:779) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:735) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:717) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:581) ~[?:?]
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) ~[?:?]
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5593) ~[?:?]
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5540) ~[?:?]
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1422) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545) ~[?:?]
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) ~[?:?]
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerCreateFile(S3AFileSystem.java:1682) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$create$6(S3AFileSystem.java:1646) ~[?:?]
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499) ~[?:?]
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:1645) ~[?:?]
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064) ~[flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:2.8.3-10.0]
	at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:154) ~[?:?]
	at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.create(HadoopFileSystem.java:37) ~[?:?]
	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:130) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:76) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:452) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flushToFile(FsCheckpointStreamFactory.java:308) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:284) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.util.ForwardingOutputStream.write(ForwardingOutputStream.java:52) ~[flink-dist-1.20.1.jar:1.20.1]
	at java.io.DataOutputStream.write(Unknown Source) ~[?:?]
	at org.apache.flink.table.data.binary.BinarySegmentUtils.copyToView(BinarySegmentUtils.java:222) ~[flink-table-api-java-uber-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer.serialize(BinaryRowDataSerializer.java:90) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48) ~[flink-table-runtime-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:146) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) ~[flink-dist-1.20.1.jar:1.20.1]
	at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
	at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.20.1.jar:1.20.1]
	... 4 more
2025-07-18 02:26:12,081 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. The latest checkpoint failed due to Asynchronous task checkpoint failed., view the Checkpoint History tab or the Job Manager log to find out why continuous checkpoints failed.
	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:212) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:191) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:124) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2281) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1171) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$3(ExecutionGraphHandler.java:123) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139) ~[flink-dist-1.20.1.jar:1.20.1]
	at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70) ~[flink-dist-1.20.1.jar:1.20.1]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
	at java.lang.Thread.run(Unknown Source) ~[?:?]
2025-07-18 02:26:12,081 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - 10 tasks will be restarted to recover from a global failure.
2025-07-18 02:26:12,081 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job etl_1927638472270520321 (37decc981c91413e0323f5b6f0fc86e7) switched from state RUNNING to RESTARTING.

What doesn't meet your expectations?

  1. Do both of these jars need to be used simultaneously? Or is only one of them required? The document states that only one is sufficient.

    • /opt/flink/lib/extra/paimon-s3-1.0.1.jar
    • /opt/flink/plugins/extra/flink-s3-fs-hadoop-1.20.1.jar
  2. The S3 connection has been closed. My speculation is that Flink checkpoint and Paimon S3 share the same connection pool. After the S3 operation is completed, the connection is closed, but Flink checkpoint is still using it, which has caused the issue.

Anything else?

No response

Are you willing to submit a PR?

  • [ ] I'm willing to submit a PR!

yl-yue avatar Jul 31 '25 07:07 yl-yue

Perhaps, the aws sdk version of your local hadoop dependency is conflicted with hadoop 3.3.4.

i was used to run paimon 1.0.1 on flink 1.17.1 S3P checkpoint successfully. The hadoop dependency of hadoop-s3-fs-hadoop module was shaded, since my flink used hadoop cdh 2.6. It is conflicted with 3.3.4.

However, after the relocation of hadoop dependency in flink-s3-fs-hadoop-1.17.1.jar, i ensured the s3a or s3p to run at the version of hadoop 3.3.4 and read/write hdfs at the version of hadoop cdh-2.6.

YetiCuzMountain avatar Sep 01 '25 02:09 YetiCuzMountain