[Question] Seatunnel +Flink Engine: Does it support flink savepoint/last-state upgrade?
Hello Seatunnel Engineers:
I have a question about Seatunnel runs on Flink Engine, Does it support flink savepoint/last-state upgrade?
From my test :seatunnel 2.3.12 runs on flink engine 1.17 or 1.18 with flink upgradeMode: savepoint or last-state, when upgrading the flinkdeployment ( with flink k8s operator), I encounter following error:
2025-12-10 10:44:00,103 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore 2025-12-10 10:44:00,104 INFO org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [] - Resetting coordinator to checkpoint. 2025-12-10 10:44:00,107 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing SourceCoordinator for source Source: Kafka-Source. 2025-12-10 10:44:00,108 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: Kafka-Source closed. 2025-12-10 10:44:00,111 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring SplitEnumerator of source Source: Kafka-Source from checkpoint. 2025-12-10 10:44:00,221 WARN org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext [] - Get flink job id failed java.lang.IllegalStateException: Initialize flink job-id failed at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:152) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getFlinkJobId(FlinkSourceSplitEnumeratorContext.java:100) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.<init>(FlinkSourceSplitEnumeratorContext.java:57) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:116) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:48) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:444) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:406) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$7(RecreateOnResetOperatorCoordinator.java:155) ~[flink-dist-1.17.2.jar:1.17.2] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.whenComplete(Unknown Source) ~[?:?]
Thanks,
Tom
It seems like we could not restore a seatunnel job from a flink checkpoint/savepoint.
It seems like we could not restore a seatunnel job from a flink checkpoint/savepoint.
I have tested this part before. If it is not a CDC synchronization task, you need to change the mode to streaming mode; for the rest, just configure them normally. Finally, save the state by each split, and restore from the checkpoint when recovering. The same logic should apply if it is a CDC task.
@yzeng1618 First ,Really appreciate your input!
here is my above test settings:
seatunnel job setting, it's a streaming job
` env { parallelism = 1 job.mode = "STREAMING" checkpoint.interval=60000 flink.execution.checkpointing.mode = "EXACTLY_ONCE" flink.execution.checkpointing.timeout = 600000 }
source { Kafka { plugin_output="fake2" topic = info consumer.group="testr" bootstrap.servers = "tom-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" format = json } }
sink { Kafka { plugin_input="fake2" topic = topc bootstrap.servers = "tom-cluster1-kafka-bootstrap.kafka.svc.cluster.local:9092" format = json kafka.request.timeout.ms = 60000 semantics = EXACTLY_ONCE } }
`
Then I run the seatunnel job in flink (by flink k8s operator) CDR FlinkDeployment
` apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: seatunnel-flink-streaming-example-2 namespace: kafka spec: ......
volumes: - name: seatunnel-config configMap: name: seatunnel-config job: jarURI: local:///opt/seatunnel/starter/seatunnel-flink-15-starter.jar entryClass: org.apache.seatunnel.core.starter.flink.SeaTunnelFlink args: ["--config", "/data/seatunnel.streaming.conf"] parallelism: 2 upgradeMode: savepoint
`
The first time , i kubectl apply the above yaml into a k8s cluster, the the seatunnel job is running as normal. the flink checkpoints saved periodically successfully.
Then i make some changes the above yaml file , then apply the yaml again in k8s. It's a kind of uprading mode Because my flink upgradeMode is savepoint (if last-state also doesn't work which uses the last checkpoint) The above error happen, can NOT restore from last checkpoint/or savepoint!
Please correct me if i'm wrong about this, thanks
Tom
Append some exception log that could be helpful for figuring out the problem:
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.18.0.jar:1.18.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "obj" is null at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:142) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-fd7fca630f268e360191f42ba11014b3:2.3.12]
Another info: for flink , i used rockdb statebackend and OSS as the savepoint/checkpoint
The upgrade mode referred to herein has not undergone testing, and its compatibility cannot be verified. It is recommended to attempt using the Zate engine.
Thanks @yzeng1618 for your input on this.
I will have a try of Zate.
but I think should be some official docs about if Seatunnel supports to restore from flink(supported versions) checkpoint/savepoint for seatunnel's every releases since the official docs mention Seatunnel supports flink engine
Thanks again for all your help here!
Tom