flink-on-k8s-operator
flink-on-k8s-operator copied to clipboard
Support HA mode for JobManager
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html
I have try to use zk for ha. but it always error:
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(ac4f896e2897c5bc96eef1d93be5433d, LocalRpcInvocation(requestJobStatus(JobID, Time))) sent to akka.tcp://flink@flinkjobcluster-sample-ha-jobmanager:6123/user/dispatcher because the fencing token is null. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Could you share your steps of enabling HA mode? I can try and reproduce.
1.add flink properties :
high-availability: zookeeper
high-availability.zookeeper.quorum: IP:2181,
high-availability.storageDir: hdfs://Test/user/HA/
high-availability.cluster-id: "123"
high-availability.jobmanager.port: "6123"
high-availability.zookeeper.path.root: /flink-k8s
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: "5000"
restart-strategy.failure-rate.failure-rate-interval: 6 min
restart-strategy.failure-rate.delay: 1 s
- start cluster
- delete jm pod. and then get the job pod log
Can we edit the job as detach mode replace now attach mode?
Why does it make a difference? In detached mode, we still need to wait the job to finish, right?
I fixed it by add flink properties.
It would be nice if you can share your properties.
It seems to be planned to support HA based on k8s native API.
https://issues.apache.org/jira/browse/FLINK-12884
Could you help me close this issue? It is some problem about my flink-conf.yaml.
@Mrart Could you please share your config file?
@Mrart @functicons I have added the below HA job manager properties to the "flink properties" section in the CRD and noticed the job manager/task manager pods getting crashed and from the logs it appears as there is an issue with the "high-availability.storageDir". I have tried an S3 loaction, a folder path from zookeeper pod etc but it appears like a hdfs security context and a HDFS location is expected for the HA to work? Is this a correct understanding? Could you please help?
Flink properties: high-availability: zookeeper high-availability.zookeeper.quorum: my-kafka-zookeeper:2181 high-availability.storageDir: s3://... high-availability.cluster-id: "zk-ha1" high-availability.jobmanager.port: "6126" high-availability.zookeeper.path.root: /flink-k8s restart-strategy: failure-rate
Below is the Job Manager error log :
2020-11-05 22:24:56,185 INFO org.apache.flink.runtime.security.SecurityUtils - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath. 2020-11-05 22:24:56,186 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Initializing cluster services. 2020-11-05 22:24:57,807 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start actor system at flink-operator-jobmanager:6126 2020-11-05 22:25:00,459 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 2020-11-05 22:25:00,671 INFO akka.remote.Remoting - Starting remoting 2020-11-05 22:25:01,461 INFO akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://flink@flink-operator-jobmanager:6126] 2020-11-05 22:25:01,952 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor system started at akka.tcp://flink@flink-operator-jobmanager:6126 2020-11-05 22:25:02,050 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
We have introduced a native K8s HA service in Flink 1.12. After then you will not have to manage a ZooKeeper cluster. Refer here[1] for how to enable K8s HA for your Flink cluster and here[2] for more information about how it works.
[1]. https://ci.apache.org/projects/flink/flink-docs-master/deployment/ha/kubernetes_ha.html [2]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144:+Native+Kubernetes+HA+for+Flink
Thanks @wangyang0918 . Will give a try.
@shravangit20 how did your attempt with Native Kubernetes HA go?
@wangyang0918 is it possible to use the Native Kubernetes HA integration in the context of the flink k8s operator? In other words, the job manager uses kubernetes services for leader election etc (instead of zk), but the rest of the k8s orchestration is done via the operators. Is that possible?
@palanieppan-m Yes, KubernetesHAService could work both for Flink standalone deployment on K8s(including k8s operator) and native K8s integration. Refer here for how to enable the HA service.
Please note that service account with enough permission(create/watch/delete configmaps) should be set both for JobManager and TaskManager pods.
@wangyang0918 i am unable to enable kubernetes HA, while deploying using this flink-k8s-operator. Specifically, i made changes to allow multiple jobmanager replica in the operator. Then tried to deploy a flink app with Zookeeper HA configuration, but the first job manager fails to come up. The job manager pod tries to connect to an akka endpoint on job manager service, but the target port was never open.
2021-02-25 02:38:29,744 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 .
2021-02-25 02:38:41,956 INFO akka.remote.transport.ProtocolStateActor [] - No response from remote for outbound association. Associate timed out after [20000 ms].
2021-02-25 02:38:41,961 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [akka.tcp://flink@flinkjobcluster-ha-zk-sample-jobmanager:36019] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@flinkjobcluster-ha-zk-sample-jobmanager:36019]] Caused by: [No response from remote for outbound association. Associate timed out after [20000 ms].]
2021-02-25 02:38:42,154 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: flinkjobcluster-ha-zk-sample-jobmanager/10.83.2.99:36019
I am trying to understand what's happening here. I am using flink 1.12.1