flink-on-k8s-operator icon indicating copy to clipboard operation
flink-on-k8s-operator copied to clipboard

Support HA mode for JobManager

Open functicons opened this issue 5 years ago • 16 comments

https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html

functicons avatar Sep 22 '19 00:09 functicons

I have try to use zk for ha. but it always error: Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(ac4f896e2897c5bc96eef1d93be5433d, LocalRpcInvocation(requestJobStatus(JobID, Time))) sent to akka.tcp://flink@flinkjobcluster-sample-ha-jobmanager:6123/user/dispatcher because the fencing token is null. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:63) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Mrart avatar Jan 03 '20 01:01 Mrart

Could you share your steps of enabling HA mode? I can try and reproduce.

functicons avatar Jan 03 '20 19:01 functicons

1.add flink properties :

high-availability: zookeeper
    high-availability.zookeeper.quorum: IP:2181,
    high-availability.storageDir: hdfs://Test/user/HA/
    high-availability.cluster-id: "123"
    high-availability.jobmanager.port: "6123"
    high-availability.zookeeper.path.root: /flink-k8s
    restart-strategy: failure-rate
    restart-strategy.failure-rate.max-failures-per-interval: "5000"
    restart-strategy.failure-rate.failure-rate-interval: 6 min
    restart-strategy.failure-rate.delay: 1 s
  1. start cluster
  2. delete jm pod. and then get the job pod log

Mrart avatar Jan 07 '20 03:01 Mrart

Can we edit the job as detach mode replace now attach mode?

Mrart avatar Jan 08 '20 08:01 Mrart

Why does it make a difference? In detached mode, we still need to wait the job to finish, right?

functicons avatar Jan 09 '20 04:01 functicons

I fixed it by add flink properties.

Mrart avatar Jan 13 '20 11:01 Mrart

It would be nice if you can share your properties.

functicons avatar Jan 14 '20 07:01 functicons

It seems to be planned to support HA based on k8s native API.

https://issues.apache.org/jira/browse/FLINK-12884

elanv avatar Feb 05 '20 02:02 elanv

Could you help me close this issue? It is some problem about my flink-conf.yaml.

Mrart avatar Apr 03 '20 08:04 Mrart

@Mrart Could you please share your config file?

shravangit20 avatar Nov 03 '20 23:11 shravangit20

@Mrart @functicons I have added the below HA job manager properties to the "flink properties" section in the CRD and noticed the job manager/task manager pods getting crashed and from the logs it appears as there is an issue with the "high-availability.storageDir". I have tried an S3 loaction, a folder path from zookeeper pod etc but it appears like a hdfs security context and a HDFS location is expected for the HA to work? Is this a correct understanding? Could you please help?

Flink properties: high-availability: zookeeper high-availability.zookeeper.quorum: my-kafka-zookeeper:2181 high-availability.storageDir: s3://... high-availability.cluster-id: "zk-ha1" high-availability.jobmanager.port: "6126" high-availability.zookeeper.path.root: /flink-k8s restart-strategy: failure-rate

Below is the Job Manager error log :

2020-11-05 22:24:56,185 INFO org.apache.flink.runtime.security.SecurityUtils - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath. 2020-11-05 22:24:56,186 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Initializing cluster services. 2020-11-05 22:24:57,807 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to start actor system at flink-operator-jobmanager:6126 2020-11-05 22:25:00,459 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 2020-11-05 22:25:00,671 INFO akka.remote.Remoting - Starting remoting 2020-11-05 22:25:01,461 INFO akka.remote.Remoting - Remoting started; listening on addresses :[akka.tcp://flink@flink-operator-jobmanager:6126] 2020-11-05 22:25:01,952 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Actor system started at akka.tcp://flink@flink-operator-jobmanager:6126 2020-11-05 22:25:02,050 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)

shravangit20 avatar Nov 06 '20 13:11 shravangit20

We have introduced a native K8s HA service in Flink 1.12. After then you will not have to manage a ZooKeeper cluster. Refer here[1] for how to enable K8s HA for your Flink cluster and here[2] for more information about how it works.

[1]. https://ci.apache.org/projects/flink/flink-docs-master/deployment/ha/kubernetes_ha.html [2]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-144:+Native+Kubernetes+HA+for+Flink

wangyang0918 avatar Dec 07 '20 03:12 wangyang0918

Thanks @wangyang0918 . Will give a try.

shravangit20 avatar Dec 10 '20 16:12 shravangit20

@shravangit20 how did your attempt with Native Kubernetes HA go?

@wangyang0918 is it possible to use the Native Kubernetes HA integration in the context of the flink k8s operator? In other words, the job manager uses kubernetes services for leader election etc (instead of zk), but the rest of the k8s orchestration is done via the operators. Is that possible?

palanieppan-m avatar Feb 10 '21 02:02 palanieppan-m

@palanieppan-m Yes, KubernetesHAService could work both for Flink standalone deployment on K8s(including k8s operator) and native K8s integration. Refer here for how to enable the HA service.

Please note that service account with enough permission(create/watch/delete configmaps) should be set both for JobManager and TaskManager pods.

wangyang0918 avatar Feb 18 '21 02:02 wangyang0918

@wangyang0918 i am unable to enable kubernetes HA, while deploying using this flink-k8s-operator. Specifically, i made changes to allow multiple jobmanager replica in the operator. Then tried to deploy a flink app with Zookeeper HA configuration, but the first job manager fails to come up. The job manager pod tries to connect to an akka endpoint on job manager service, but the target port was never open.

2021-02-25 02:38:29,744 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 .
2021-02-25 02:38:41,956 INFO  akka.remote.transport.ProtocolStateActor                     [] - No response from remote for outbound association. Associate timed out after [20000 ms].
2021-02-25 02:38:41,961 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [akka.tcp://flink@flinkjobcluster-ha-zk-sample-jobmanager:36019] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink@flinkjobcluster-ha-zk-sample-jobmanager:36019]] Caused by: [No response from remote for outbound association. Associate timed out after [20000 ms].]
2021-02-25 02:38:42,154 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: flinkjobcluster-ha-zk-sample-jobmanager/10.83.2.99:36019

I am trying to understand what's happening here. I am using flink 1.12.1

palanieppan-m avatar Feb 25 '21 02:02 palanieppan-m