seatunnel
seatunnel copied to clipboard
[Discuss][Core] How to up-compatible flink at api level
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
this is a subtask of #2927
Now,Later versions of FLink do not allow direct set up Configuration,the Configuration is unmodifiable,and i think constructors are more stable,so i want to set the configuration via the constructor.
SeaTunnel Version
dev
SeaTunnel Config
-
Running Command
-
Error Exception
java.lang.UnsupportedOperationException: The configuration is unmodifiable; its contents cannot be changed.
Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Now flink (flink 1.14.X and later) can not modify Configuration when task is Running,so we can not chanage pipeline.jars or pipeline.classpath etc when task state is running .Here is bug code configuration.set(PipelineOptions.JARS, jars.stream().distinct().collect(Collectors.toList())); in FlinkEnvironment.registerPlugin. So i recommend using constructors instead. Is there a better way?Please communicate here.
Now flink (flink 1.14.X and later) can not modify
Configurationwhen task is Running,so we can not chanagepipeline.jarsorpipeline.classpathetc when task state is running .Here is bug codeconfiguration.set(PipelineOptions.JARS, jars.stream().distinct().collect(Collectors.toList()));inFlinkEnvironment.registerPlugin. So i recommend using constructors instead. Is there a better way?Please communicate here.
@ashulin @EricJoy2048 @TyrantLucifer @ic4y please give some suggesions.
Now flink (flink 1.14.X and later) can not modify
Configurationwhen task is Running,so we can not chanagepipeline.jarsorpipeline.classpathetc when task state is running .Here is bug codeconfiguration.set(PipelineOptions.JARS, jars.stream().distinct().collect(Collectors.toList()));inFlinkEnvironment.registerPlugin. So i recommend using constructors instead. Is there a better way?Please communicate here.
+1 . Looking forward your PR!
+1 . Looking forward your PR!
+1
After I upgraded Flink to 1.14. x, it ran normally in the development tool, but an error was reported after packaging. Have you ever had the same problem?
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Flink job executed failed
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.seatunnel.core.starter.exception.CommandExecuteException: Flink job executed failed
at org.apache.seatunnel.core.starter.flink.command.FlinkApiTaskExecuteCommand.execute(FlinkApiTaskExecuteCommand.java:57)
at org.apache.seatunnel.core.starter.Seatunnel.run(Seatunnel.java:40)
at org.apache.seatunnel.core.starter.flink.SeatunnelFlink.main(SeatunnelFlink.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 8 more
Caused by: org.apache.seatunnel.core.starter.exception.TaskExecuteException: Execute Flink job error
at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:93)
at org.apache.seatunnel.core.starter.flink.command.FlinkApiTaskExecuteCommand.execute(FlinkApiTaskExecuteCommand.java:55)
... 15 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 84eeb69eb8db0eb0e714a43f522f13ff)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1916)
at org.apache.seatunnel.core.starter.flink.execution.FlinkExecution.execute(FlinkExecution.java:91)
... 16 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 84eeb69eb8db0eb0e714a43f522f13ff)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$26(RestClusterClient.java:698)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:403)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
... 24 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource
ClassLoader info: URL ClassLoader:
file: '/tmp/blobStore-85fca91a-d1ee-4506-8492-22973e0a0b2f/job_84eeb69eb8db0eb0e714a43f522f13ff/blob_p-a2b78d662d77cfacbc3e9b6def6bbc11c7581a02-1401c2fae014ec13a4796ca4a7605eeb' (valid JAR)
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:338)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.
Caused by: java.lang.ClassNotFoundException: org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource
Maybe you didn't load the connector.
connectors
In the ${SEATUNNEL_HOME}/connectors/seatunnel folder, there is connector-fake-2.1.3-SNAPSHOT.jar
connectors
In the ${SEATUNNEL_HOME}/connectors/seatunnel folder, there is connector-fake-2.1.3-SNAPSHOT.jar
It is estimated that it can’t be modified to modify the configuration by SetConfiguration
connectors
In the ${SEATUNNEL_HOME}/connectors/seatunnel folder, there is connector-fake-2.1.3-SNAPSHOT.jar
It is estimated that it can’t be modified to modify the configuration by
SetConfiguration
Please wait for my pr.
connectors
In the ${SEATUNNEL_HOME}/connectors/seatunnel folder, there is connector-fake-2.1.3-SNAPSHOT.jar
It is estimated that it can’t be modified to modify the configuration by
SetConfigurationPlease wait for my pr.
I modified SetConfiguration according to your method
connectors
In the ${SEATUNNEL_HOME}/connectors/seatunnel folder, there is connector-fake-2.1.3-SNAPSHOT.jar
It is estimated that it can’t be modified to modify the configuration by
SetConfigurationPlease wait for my pr.
I modified SetConfiguration according to your method
please try again.