seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Question] Seatunnel +Flink Engine: Does it support flink savepoint/last-state upgrade?

Open tomma-a opened this issue 3 weeks ago • 7 comments

Hello Seatunnel Engineers:

I have a question about Seatunnel runs on Flink Engine, Does it support flink savepoint/last-state upgrade?

From my test :seatunnel 2.3.12 runs on flink engine 1.17 or 1.18 with flink upgradeMode: savepoint or last-state, when upgrading the flinkdeployment ( with flink k8s operator), I encounter following error:

2025-12-10 10:44:00,103 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore 2025-12-10 10:44:00,104 INFO org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [] - Resetting coordinator to checkpoint. 2025-12-10 10:44:00,107 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing SourceCoordinator for source Source: Kafka-Source. 2025-12-10 10:44:00,108 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: Kafka-Source closed. 2025-12-10 10:44:00,111 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring SplitEnumerator of source Source: Kafka-Source from checkpoint. 2025-12-10 10:44:00,221 WARN org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext [] - Get flink job id failed java.lang.IllegalStateException: Initialize flink job-id failed at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:152) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getFlinkJobId(FlinkSourceSplitEnumeratorContext.java:100) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.<init>(FlinkSourceSplitEnumeratorContext.java:57) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:116) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:48) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:444) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:406) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$7(RecreateOnResetOperatorCoordinator.java:155) ~[flink-dist-1.17.2.jar:1.17.2] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.whenComplete(Unknown Source) ~[?:?]

Thanks,

Tom

tomma-a avatar Dec 10 '25 10:12 tomma-a

It seems like we could not restore a seatunnel job from a flink checkpoint/savepoint.

tomma-a avatar Dec 10 '25 11:12 tomma-a

It seems like we could not restore a seatunnel job from a flink checkpoint/savepoint.

I have tested this part before. If it is not a CDC synchronization task, you need to change the mode to streaming mode; for the rest, just configure them normally. Finally, save the state by each split, and restore from the checkpoint when recovering. The same logic should apply if it is a CDC task.

yzeng1618 avatar Dec 11 '25 01:12 yzeng1618

@yzeng1618 First ,Really appreciate your input!

here is my above test settings:

seatunnel job setting, it's a streaming job

` env { parallelism = 1 job.mode = "STREAMING" checkpoint.interval=60000 flink.execution.checkpointing.mode = "EXACTLY_ONCE" flink.execution.checkpointing.timeout = 600000 }

source { Kafka { plugin_output="fake2" topic = info consumer.group="testr" bootstrap.servers = "tom-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" format = json } }

sink { Kafka { plugin_input="fake2" topic = topc bootstrap.servers = "tom-cluster1-kafka-bootstrap.kafka.svc.cluster.local:9092" format = json kafka.request.timeout.ms = 60000 semantics = EXACTLY_ONCE } }

`

Then I run the seatunnel job in flink (by flink k8s operator) CDR FlinkDeployment

` apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: seatunnel-flink-streaming-example-2 namespace: kafka spec: ......

volumes: - name: seatunnel-config configMap: name: seatunnel-config job: jarURI: local:///opt/seatunnel/starter/seatunnel-flink-15-starter.jar entryClass: org.apache.seatunnel.core.starter.flink.SeaTunnelFlink args: ["--config", "/data/seatunnel.streaming.conf"] parallelism: 2 upgradeMode: savepoint

`

The first time , i kubectl apply the above yaml into a k8s cluster, the the seatunnel job is running as normal. the flink checkpoints saved periodically successfully.

Then i make some changes the above yaml file , then apply the yaml again in k8s. It's a kind of uprading mode Because my flink upgradeMode is savepoint (if last-state also doesn't work which uses the last checkpoint) The above error happen, can NOT restore from last checkpoint/or savepoint!

Please correct me if i'm wrong about this, thanks

Tom

tomma-a avatar Dec 11 '25 01:12 tomma-a

Append some exception log that could be helpful for figuring out the problem: at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.18.0.jar:1.18.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "obj" is null at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:142) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-fd7fca630f268e360191f42ba11014b3:2.3.12]

tomma-a avatar Dec 11 '25 02:12 tomma-a

Another info: for flink , i used rockdb statebackend and OSS as the savepoint/checkpoint

tomma-a avatar Dec 11 '25 02:12 tomma-a

The upgrade mode referred to herein has not undergone testing, and its compatibility cannot be verified. It is recommended to attempt using the Zate engine.

yzeng1618 avatar Dec 11 '25 03:12 yzeng1618

Thanks @yzeng1618 for your input on this.

I will have a try of Zate.

but I think should be some official docs about if Seatunnel supports to restore from flink(supported versions) checkpoint/savepoint for seatunnel's every releases since the official docs mention Seatunnel supports flink engine

Thanks again for all your help here!

Tom

tomma-a avatar Dec 11 '25 07:12 tomma-a