dolphinscheduler icon indicating copy to clipboard operation
dolphinscheduler copied to clipboard

[Feature][Executor] Add K8S Executor for task dispatching

Open EricGao888 opened this issue 2 years ago • 15 comments

Search before asking

  • [X] I had searched in the issues and found no similar feature requirement.

Description

  • Still WIP.
  • Discussions are more than welcome.

Why we need it?

  • Higher resource utilization - The core idea is one task per pod. With the help of K8S metrics server, users could clearly get the resource consumption data for each task (both CPU and Memory) so that they could adjust and allocate suitable amount of resources based on the data to each specific task pod and achieve global minimum resource consumption. In the future, we might even use K8S VPA to automatically adjust the resource allocation.
  • Better monitoring (pod level, cloud native, K8S metrics server)
  • Easier fault tolerance
  • Faster iteration, for updating task plugin, there is no need to upgrade all workers.

How to design it?

Compatibility

  • Do some decoupling and abstraction work before adding a K8S executor. Make executor configurable. If users choose not to use K8S Executor, he will be able to use the current way - one worker per pod.

Fault Tolerance

  • Make DS worker stateless in K8S environment (one task per pod)
  • DS worker does not need to rely on ZK. We could start a thread in K8S executor to interact with K8S watcher to subscribe to the state change event of worker pod. Since one task per pod, there is no need to decide which pod to resume the task execution if worker crashes, just start a new one.

Communication With Master

  • Is it possible to use a simpler way instead of Netty for communication between worker and master in K8S environment? Such as a message queue? Since there is one task per pod and K8S Executor could kill the pod through K8S API server, looks like not necessary to use Netty server and client in worker in K8S environment. For logging, we could use PV or remote logging. Maybe we could find a simpler way for K8S Executor to pass TaskExecutionContext into worker.

Combined With DS Hadoop Executor

  • For difference, we temporarily call current executor as DS HadoopExecutor.
  • There is no perfect solution, only trade-off. Pros and cons?
  • detach(async) vs attach(sync) - sync task, resource usage hard to estimate -> use Hadoop Executor (one worker per pod); async task, resource usage easy to estimate -> use K8S Executor (one task per pod)
  • Two queues for two executors: HadoopTaskPriorityQueue and K8STaskPriorityQueue
  • For Hadoop Executor (one worker one pod), we could add worker pod elasticity feature, also mentioned in #9337 One possible solution is to use K8S HPA and KEDA with the number of queued and running tasks as the metric.

General Design (Not Detailed)

K8S Executor

image

  • With VPA: WIP

  • DS K8S Worker: WIP

  • Pros & Cons

Compound Solution (K8S Executor + Hadoop Executor)

image

Asynchronous Tasks (Detach)

Synchronous Tasks (Attach)

Maximum Resource Utilization Rate

Elastic Worker

image

RoadMap & Milestones

  • image

Sub-Tasks

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

EricGao888 avatar Jan 03 '23 04:01 EricGao888

Thank you for your feedback, we have received your issue, Please wait patiently for a reply.

  • In order for us to understand your request as soon as possible, please provide detailed information、version or pictures.
  • If you haven't received a reply for a long time, you can join our slack and send your question to channel #troubleshooting

github-actions[bot] avatar Jan 03 '23 04:01 github-actions[bot]

Is it possible to use a simpler way instead of Netty for communication between worker and master in K8S environment? Such as a message queue?

+1, I think if k8s watcher is in master, we can use event queue locally.

caishunfeng avatar Jan 03 '23 08:01 caishunfeng

Just some drafts, will redraw later:

K8S Executor

image

Compound Solution - Hadoop + K8S Executor

image

EricGao888 avatar Jan 04 '23 02:01 EricGao888

For #9337 - KEDA + HPA for worker pod scaling (one worker one pod). PS: this is a solution for Hadoop Executor Elasticity, not for K8S Executor:

image

EricGao888 avatar Jan 04 '23 06:01 EricGao888

KEDA: https://github.com/kedacore/keda

EricGao888 avatar Jan 04 '23 06:01 EricGao888

FYI: K8S HPA: https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/ K8S VPA: https://github.com/kubernetes/autoscaler/tree/master/vertical-pod-autoscaler

EricGao888 avatar Jan 04 '23 07:01 EricGao888

@MonsterChenzhuo

zhongjiajie avatar Jan 04 '23 12:01 zhongjiajie

Hi, @EricGao888 , I'm interested in this issue. If help is needed, I am willing to participate in it.

rickchengx avatar Jan 04 '23 13:01 rickchengx

Hi, @EricGao888 , I'm interested in this issue. If help is needed, I am willing to participate in it.

@rickchengx Thanks for reaching out! Feel free to participate in the discussions and design : )

EricGao888 avatar Jan 04 '23 13:01 EricGao888

A brief roadmap with several milestones:

image

Later this week, I will send a proposal email to dev mailing list : )

EricGao888 avatar Jan 05 '23 03:01 EricGao888

Hi, @EricGao888 , it's a promising work! I'm willing to make more contribution to it.

There is no perfect solution, only trade-off.

+1, different users may have different targets of latency and resource utilization, some wish to maximize throughput under limited resources, and others may desire high resource utilization. When using k8s executor, because of stateless and one task per pod design, it can easily meet resource utilization target, however, some problems like cold start of container will affect latency. The problem will be exaggerated when task execution time is short, at this time, hadoop executor is better.

Radeity avatar Jan 05 '23 03:01 Radeity

I have a question. In the scenario where each task has one pod, how to determine the resources of each pod?

liujianmin1024 avatar Nov 28 '23 11:11 liujianmin1024

I will focus on CI / security and have no time to follow up with this one this year. Therefore, I close it temporarily.

EricGao888 avatar Mar 27 '24 12:03 EricGao888

I think we can reopen this issue, anybody who'd like to implement it can leave a message

davidzollo avatar Apr 08 '24 13:04 davidzollo

目前正在开发弹性资源组件, 技术架构 原理,资源分两条线a线 现有资源,b线 待定资源 a) 4资源请求->5a 分配可用资源-> 6a 请求使用资源-> 7a 提供资源->8a 提交任务 b) 4资源请求->5b 分配待定资源-> 6b 请求新worker-> 7b 部署worker->8b 注册/报告资源 a线是分配现有资源;b线请求新资源,新资源注册后成为现有资源,变成a线资源,在a线分配 该组件时通用的弹性资源,准备接入组件,增加弹性能力,如,datax,xxl-job,eventbridge,dolphin-scheduler 下图是开发架构: 开发架构 实现业务资源消费者和launcher,下图xxl-job集成设计 xxl-job xxl-job的master负责实现业务资源消费者,分片后,分发器按分片(组)申请资源;申请资源后,发布指令,新建执行器

leehom avatar Apr 25 '24 08:04 leehom