flinkk8soperator icon indicating copy to clipboard operation
flinkk8soperator copied to clipboard

bad state after delete issued

Open andrewgdavis opened this issue 5 years ago • 3 comments

similar issue to https://github.com/lyft/flinkk8soperator/issues/13

basically, there was an access issue to the save point location (i was using S3), and after seeing the logs that something was wrong kubectl delete -f myFlinkApp.yaml

however the jobs would not delete, and the following error message is looped infinitely: {"json":{"app_name":"davis-wordcount-operator-example","ns":"flink-operator","phase":"Deleting"},"level":"info","msg":"Logged Warning event: SavepointFailed: Failed to take savepoint {java.util.concurrent.CompletionException java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: Not all required tasks are currently running.\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$triggerSavepoint$0(LegacyScheduler.java:510)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: Not all required tasks are currently running.\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)\n\tat java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)\n\tat java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)\n\tat java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)\n\tat java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.triggerSavepoint(LegacyScheduler.java:504)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:638)\n\tat sun.reflect.GeneratedMethodAccessor113.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)\n\t... 22 more\nCaused by: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: Not all required tasks are currently running.\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepointInternal(CheckpointCoordinator.java:428)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:377)\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.triggerSavepoint(LegacyScheduler.java:503)\n\t... 28 more\n}","ts":"2019-10-10T21:16:34Z"}

it may be nice to have pre-emptive test, or configurable number of retrys on failure.


after manually issuing kubectl delete deployment $nameOfFlinkAppDeployment the flinkoperator gets into a bad state trying to manage the jobs. issuing kubectl delete flinkapplication $nameOfFlinkApp results in a failure message in the operator: {"json":{"app_name":"davis-wordcount-operator-example","ns":"flink-operator","phase":"Deleting"},"level":"warning","msg":"Failed to reconcile resource flink-operator/davis-wordcount-operator-example: GetJobs call failed with status FAILED and message []: Get http://davis-wordcount-operator-example-0a501337.flink-operator:8081/jobs: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)","ts":"2019-10-10T21:26:47Z"} The workaround was to delete the finalizer when kubectl edit flinkapplication $nameOfFlinkApp

andrewgdavis avatar Oct 10 '19 21:10 andrewgdavis

@andrewgdavis

This is the default setting. We try to savepoint the job before bringing the cluster down. Please set the deleteMode to None if you want deletion to go through without savepointing - https://github.com/lyft/flinkk8soperator/blob/master/docs/crd.md

anandswaminathan avatar Oct 15 '19 00:10 anandswaminathan

Savepointing was desired; it just happened to fail, which in turn put the operator into a bad state.

andrewgdavis avatar Oct 16 '19 20:10 andrewgdavis

@andrewgdavis I missed the last line in your issue.

We already have it. You can configure by setting here: https://github.com/lyft/flinkk8soperator/blob/master/pkg/controller/config/config.go#L26

anandswaminathan avatar Nov 21 '19 23:11 anandswaminathan