incubator-streampark
incubator-streampark copied to clipboard
[Bug] Flink native k8s application mode recovery failed from S3(s3p) savepoint
Search before asking
- [X] I had searched in the issues and found no similar issues.
Java Version
1.8
Scala Version
2.12.x
StreamPark Version
2.1.1
Flink Version
1.17.1
deploy mode
kubernetes-application
What happened
Flink failed to recover from savepoints that automatically saved by streampark, Through reviewing the logs, it was found that the value of the savepoint submitted during streampark's recovery of the flash is s3p://lakehouse/flink/sp/Platform-Link-Test-Security Log/savepoint-2b3ed0-f0c7ba51791f .
By checking the logs of the Flink app, it was found that Flink encountered an error when restoring from savepoint s3p://lakehouse/flink/sp/Platform-Link-Test-Security-Log/savepoint-2b3ed0-f0c7ba51791f .
Afterwards, manually submitting using the same savepoint s3p://lakehouse/flink/sp/Platform-Link-Test-Security Log/savepoint-2b3ed0-f0c7ba51791f through common cli encountered the same error
However, by modifying the savepoint format to s3p://lakehouse/flink/sp/Platform-Flink-Test-Security-Log/savepoint-2b3ed0-f0c7ba51791f/_metadata, both common cli and streampark submissions can be successful.
Error Exception
The error log of Flink is as follows:
2023-09-02 02:57:38,909 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint found during restore.
2023-09-02 02:57:38,912 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Starting job ff31d8dfb89d0a2e3fdf618617af15bc from savepoint s3p://lakehouse/flink/sp/Platform-Flink-Test-Security-Log/savepoint-2b3ed0-f0c7ba51791f ()
2023-09-02 02:57:39,360 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job ff31d8dfb89d0a2e3fdf618617af15bc reached terminal state FAILED.
org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.io.EOFException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
... 3 more
Caused by: java.lang.RuntimeException: java.io.EOFException
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 3 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.flink.runtime.checkpoint.Checkpoints.loadCheckpointMetadata(Checkpoints.java:113)
at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:149)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1849)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:223)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:198)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:136)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
... 4 more
2023-09-02 02:57:39,395 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job ff31d8dfb89d0a2e3fdf618617af15bc has been registered for cleanup in the JobResultStore after reaching a terminal state.
2023-09-02 02:57:40,108 WARN org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly:
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_382]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_382]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_382]
at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_382]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_382]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_382]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_382]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
... 12 more
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
... 12 more
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Platform-Flink-Test-security-Log'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) ~[?:?]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
... 12 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_382]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.io.EOFException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.lang.RuntimeException: java.io.EOFException
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_382]
at org.apache.flink.runtime.checkpoint.Checkpoints.loadCheckpointMetadata(Checkpoints.java:113) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:149) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1849) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:223) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:198) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:136) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
2023-09-02 02:57:40,132 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_382]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_382]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_382]
at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_780559f2-1c11-4714-a94e-f6aa4acd958f.jar:1.17.1]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_382]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_382]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_382]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_382]
Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
... 13 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
... 12 more
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:938) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
... 12 more
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'Platform-Flink-Test-security-Log'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) ~[?:?]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
at com.entercom.china.itsdsi.security.iceberg.ProbeLogToIceberg.main(ProbeLogToIceberg.java:1389) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.1.jar:1.17.1]
... 12 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_382]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.io.EOFException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_382]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.lang.RuntimeException: java.io.EOFException
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_382]
at org.apache.flink.runtime.checkpoint.Checkpoints.loadCheckpointMetadata(Checkpoints.java:113) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:149) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1849) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:223) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:198) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:210) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:136) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:371) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:348) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
2023-09-02 02:57:40,145 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2023-09-02 02:57:40,145 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
Screenshots
Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!(您是否要贡献这个PR?)
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thank you for the detailed feedback. We will work quickly to identify and fix this bug. 💪🔧
Thank you for the detailed feedback. We will work quickly to identify and fix this bug. 💪🔧
The above two screenshots are from the Flink 1.17.1 official document
In actual testing, the SP address in Figure 2 should use the format 2 in Figure 1 to function properly, but using the format 1 in Figure 1 will fail, which is also the reason for the current error.
By reviewing the source code, I understand that the process should be as follows:
First, trigger the generation of sp, then streampark will obtain the generated sp from Flink and write it to the database. Second, when restoring from sp, it will obtain the sp parameters from the database.
So if we fix this issue, should we obtain the format 2 in Figure 1 when obtaining sp from Flink, so that the code in other places doesn't need to be changed?
Thank you for providing the information. We encourage you to fix this bug, how about it? 💪 We need to test and determine the savepoint path rules under different versions of Flink (1.12 ~ 1.17). We warmly welcome you to fix this bug. And we believe you can do it! 👍😊
Thank you for providing the information. We encourage you to fix this bug, how about it? 💪 We need to test and determine the savepoint path rules under different versions of Flink (1.12 ~ 1.17). We warmly welcome you to fix this bug. And we believe you can do it! 👍😊
I am willing to fix this bug and am currently reading the relevant code, but due to my limited abilities, it may take some time
By reviewing the relevant source code and Flink official documents, I believe that the correct savepoint format should be Format 1 in the screenshot
So I think this should be a problem with Flink, not Streampark。To prove this, I ran the following test
Flink Version 1.17.1
- Use HDFS to store the savepoint, and use the sp recovery task in format 1 as shown in the screenshot. Test result successful
- Use S3 to store the savepoint and select the s3a protocol, and use the sp restore task of format 1 in the screenshot. Test result successful
- Use S3 to store the savepoint and select the s3p protocol, and use the sp restore task of format 1 in the screenshot. test result failed
- Use S3 to store the savepoint and select the s3p protocol, and use the sp recovery task in format 2 in the screenshot. Test result successful
In summary, it should be that flink sp has unexpected behavior when using S3 storage and using the s3p protocol
If this is the design goal of flink, then maybe streampark needs to be optimized specifically for this scenario. If this does not meet the design goals of flink, it seems that the BUG should be reported to the flink community
@wolfboys
By reviewing the relevant source code and Flink official documents, I believe that the correct savepoint format should be Format 1 in the screenshot
So I think this should be a problem with Flink, not Streampark。To prove this, I ran the following test
Flink Version 1.17.1
- Use HDFS to store the savepoint, and use the sp recovery task in format 1 as shown in the screenshot. Test result successful
- Use S3 to store the savepoint and select the s3a protocol, and use the sp restore task of format 1 in the screenshot. Test result successful
- Use S3 to store the savepoint and select the s3p protocol, and use the sp restore task of format 1 in the screenshot. test result failed
- Use S3 to store the savepoint and select the s3p protocol, and use the sp recovery task in format 2 in the screenshot. Test result successful
In summary, it should be that flink sp has unexpected behavior when using S3 storage and using the s3p protocol
If this is the design goal of flink, then maybe streampark needs to be optimized specifically for this scenario. If this does not meet the design goals of flink, it seems that the BUG should be reported to the flink community
@wolfboys
Sorry for taking so long to get back here, based on your description, there is a preliminary suspicion that it might be a bug in Flink. We need further confirmation. If it is true, we can provide feedback to the Flink community.
