Support for the execution policy API in JobSet
What would you like to be added:
The new executionPolicy API which allows to submit replicated Jobs in order.
When the first replicated Jobs are the reached required condition, the next replicated Jobs are created.
Note. The complex DAG workflow capability is out of scope of this API, since we don't want to implement workflow functionality as part of this KEP. Users should consider to use Argo Workflows or Tekton Pipelines if they need it.
The initial API design:
type JobSetSpec struct {
ExecutionPolicy *ExecutionPolicy `json:"executionPolicy,omitempty"`
}
type ExecutionPolicy struct {
// Order in which Jobs will be created. The default is AnyOrder.
ExecutionPolicyOrder ExecutionPolicyOption `json:"executionPolicyOrder"`
// After all replicated Jobs reach this status, the JobSet will create the next replicated Jobs.
ReplicatedJobsStatus ReplicatedJobsStatusOption `json:"replicatedJobsStatus"`
}
type ExecutionPolicyOption string
const (
AnyOrder ExecutionPolicyOption = "AnyOrder"
InOrder ExecutionPolicyOption = "InOrder"
)
type ReplicatedJobsStatusOption string
// We don't add Ready condition here, since users can use the `startupPolicy` API for that.
const (
ReadyStatus ReplicatedJobsStatusOption = "Succeeded"
FailedStatus ReplicatedJobsStatusOption = "Failed"
ActiveStatus ReplicatedJobsStatusOption = "Active"
SuspendedStatus ReplicatedJobsStatusOption = "Suspended"
)
Why is this needed:
More context in this Kubernetes wg-batch thread: https://kubernetes.slack.com/archives/C032ZE66A2X/p1725400839102729
As part of the Kubeflow Training V2 APIs, we want to implement the LLM runtimes for LLMs fine-tuning: https://github.com/kubeflow/training-operator/issues/2170
That will require JobSet to orchestrate the sequence of 2-3 Jobs: Initializer -> Trainer -> Post-Processor.
The capacity management for such workload should be allocated for all Jobs combined and be controlled by Kueue.
When TrainJob is suspended, we will suspend all underlying Jobs.
I think, we might have more use-cases from the HPC side. Any thoughts @vsoch @alculquicondor ?
This enhancement requires the following artifacts:
- [x] Design doc
- [x] API change
- [x] Docs update
The artifacts should be linked in subsequent comments.
cc @tenzen-y @kannon92 @ahg-g @johnugeorge @akshaychitneni @shravan-achar
/kind feature /kind api-change
SGTM!
(The description shouldn’t be a design but I think that we don’t need suspended. And you also declare ReadyStatus for the succeeded state)
This would be great! I agree to not try to create a workflow tool, but allowing this basic dependency structure is something that workflow tools can use. +1 from me.
That sounds like a really cool feature! I'm curious about something though. Currently, in the Jobset, we have multiple policies like StartupPolicy and FailurePolicy. As we keep adding more features to our policies, we might to think about whether we should make them work together or if we need to set some restrictions.(for example, can StartupPolicy and ExecutionPolicy coexist) This way, users can avoid making a lot of mistakes when using them.
I think the feature makes sense; executing "Initializer -> Trainer -> Post-Processor" stages sequentially for LLM fine-tuning is an interesting concrete use case. @andreyvelich can you educate me on why the fine-tuning steps must be decomposed into separate jobs? Why can't the same job perform initialization, training, then post-processing?
As we keep adding more features to our policies, we might to think about whether we should make them work together or if we need to set some restrictions.(for example, can StartupPolicy and ExecutionPolicy coexist) This way, users can avoid making a lot of mistakes when using them.
@googs1025 we can add validation to the JobSet webhook which ensures the spec configurations are compatible with eachother.
@andreyvelich can you educate me on why the fine-tuning steps must be decomposed into separate jobs?
@danielvegamyhre Sure. Given the size of today's models (e.g. >100b params) and datasets, the download time for pre-trained model and dataset takes time, e.g. 1-2 hours. If we define this initialization steps as initContainer on the training Node, it will lock the GPUs that we allocate for this training Node.
We don't want to waste GPU resources when we do initialization.
As we keep adding more features to our policies, we might to think about whether we should make them work together or if we need to set some restrictions.(for example, can StartupPolicy and ExecutionPolicy coexist) This way, users can avoid making a lot of mistakes when using them.
The current JobSet APi is alpha. So, I think it would be great to consider how we can cooperate some policies and how we can consolidate some fields into one once we graduate to beta stage.
This is one of the reasons why we keep parking the alpha stage, as we discussed in the previous meeting.
If the goal is not to look the GPUs during other stages, it is important to note what your expectations are for the future when queuing in Kueue. It sounds like we should be queuing the steps separately, meaning that we need separate "suspend" fields or 3 different objects.
This would be great! I agree to not try to create a workflow tool, but allowing this basic dependency structure is something that workflow tools can use. +1 from me.
My concern is where do we stop. You add this feature now, then someone comes and asks "what if we add X", and suddenly we have a workflow tool.
If the goal is not to look the GPUs during other stages, it is important to note what your expectations are for the future when queuing in Kueue. It sounds like we should be queuing the steps separately, meaning that we need separate "suspend" fields or 3 different objects.
Good point. Actually, I guess that the JobSet already has the same problem in the StartupPolicy. The problem is limited compared with executionPolicy, though. So, I'm wondering if we should investigate a solid approach to support the Jobs with ordering and steps.
My concern is where do we stop. You add this feature now, then someone comes and asks "what if we add X", and suddenly we have a workflow tool.
I have the same concern. So, if we introduce this feature, I think we need to consider API to prevent conditional parameters. Specifically, the below API has potential extensibility, but I believe that we should not introduce such an extensibility.
type ExecutionPolicy struct {
// Order in which Jobs will be created. The default is AnyOrder.
ExecutionPolicyOrder ExecutionPolicyOption `json:"executionPolicyOrder"`
// After all replicated Jobs reach this status, the JobSet will create the next replicated Jobs.
ReplicatedJobsStatus ReplicatedJobsStatusOption `json:"replicatedJobsStatus"`
}
It sounds like we should be queuing the steps separately, meaning that we need separate "suspend" fields or 3 different objects.
I think that the separate "suspend" field and 3 different objects are hard to maintain and not straightforward. So, I'm leaning toward the JobReadynessGate (current K8s does not support it) since it is a straightforward approach and intuitive, simple, and generic approach.
My concern is where do we stop. You add this feature now, then someone comes and asks "what if we add X", and suddenly we have a workflow tool.
+1
I think that the separate "suspend" field and 3 different objects are hard to maintain and not straightforward.
Are you referring to suspending each of the 3 jobs ( Initializer job, Trainer job, Post-Processor job) and resuming them one at a time as the prior job's execution completes?
My concern is where do we stop. You add this feature now, then someone comes and asks "what if we add X", and suddenly we have a workflow tool.
You are starting to sound like me! The answer is that we stop there with a "depends on." If you look at workload managers, it's fairly common to be able to say:
submit job a
submit job b that depends on a
And that's it. There is no further orchestration, suspend or waiting, etc. It's just a flat level depends on that can provide a simple API for actual workflow tools (that represent a dag) to use.
You are saying that you want to support arbitrary DAGs already, whereas the initial proposal is just to support a sequence of jobs. That's the kind of leaps I was rather against.
An arbitrary DAG is already a workflow manager. Why aren't we using a workflow manager as opposed to add workflow capabilities to jobset?
You are saying that you want to support arbitrary DAGs already, whereas the initial proposal is just to support a sequence of jobs. That's the kind of leaps I was rather against.
Where did I say that?
The answer is that we stop there with a "depends on."
Unless I misinterpreted what you were trying to say.
Depends on (in and of itself) does not create what most would consider substantial enough for a workflow DAG. It provides a minimal API in the workload manager so that workflow tools can more easily create complex logic.
- https://hpc.nih.gov/docs/job_dependencies.html
- https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_26.html
The workload manager has no knowledge of checking state, submission, or custom action logic beyond the very simple "A depends on B." It's not really enough to be called a workflow but it enables workflow tools to better interact with it. I am absolutely not saying that I want to support arbitrary DAGs. I am pointing out a common pattern in the workload manager ecosystem for HPC that has existed for decades, and made it possible for workflow tools to better integrate.
Do you have a minimal proposal for what jobset could provide? Are you saying that you want to say "jobset X depends on jobset Y"? I ask that because the current proposal is along the lines of "this jobset has multiple steps within it, each of which can be started after the other".
Those are two very different approaches.
No I don’t think depends on would fall on the level of the jobset, more likely the job.
No I don’t think depends on would fall on the level of the jobset, more likely the job.
I think having the "depends on" relationship operate on the ReplicatedJob level may be better - for example, for something like the "initializer -> trainer -> post-processor" use case described in this issue, I think we may want to have each stage be a separate ReplicatedJob (since for example, the "trainer" stage may be composed of several concurrent jobs running on different groups of infrastructure, but they all depend on the "initializer" stage being completed).
No I don’t think depends on would fall on the level of the jobset, more likely the job.
I think having the "depends on" relationship operate on the ReplicatedJob level may be better - for example, for something like the "initializer -> trainer -> post-processor" use case described in this issue, I think we may want to have each stage be a separate ReplicatedJob (since for example, the "trainer" stage may be composed of several concurrent jobs running on different groups of infrastructure, but they all depend on the "initializer" stage being completed).
Exactly, that is what I had in mind when I suggested this in the slack channel, similar to StartupPolicy which is executed at the ReplicatedJob level too. ExecutionPolicy and StartupPolicy are similar, but one is based on readiness status, the other based on completion status.
also, the suggestion here is not to have an API to explicitly define the chain, it is based on the order in the replicatedJob list, again similar to StartupPolicy.
No I don’t think depends on would fall on the level of the jobset, more likely the job.
I think having the "depends on" relationship operate on the ReplicatedJob level may be better - for example, for something like the "initializer -> trainer -> post-processor" use case described in this issue, I think we may want to have each stage be a separate ReplicatedJob (since for example, the "trainer" stage may be composed of several concurrent jobs running on different groups of infrastructure, but they all depend on the "initializer" stage being completed).
Exactly, that is what I had in mind when I suggested this in the slack channel, similar to StartupPolicy which is executed at the ReplicatedJob level too. ExecutionPolicy and StartupPolicy are similar, but one is based on readiness status, the other based on completion status.
Awesome, well if we are all aligned on this I can implement it.
The only thing we probably want to be configured is how many entries to run sequentially. The default (expressed as nil) is all, but the user could set it to 1 for example to run the first entry (does some initialization for the whole workload) and the rest of entries can execute in parallel.
How should we calculate quota usage for such a sequence?
cc @mimowo @mwielgus
Yes, this proposal is only for Job sequence, and as @vsoch mentioned this concept exists in HPC space for a long time.
Awesome, well if we are all aligned on this I can implement it.
@ahg-g @danielvegamyhre Do we need a small KEP for it or we can jump directly to the implementation ?
How should we calculate quota usage for such a sequence?
@alculquicondor I think, in the first iteration we should calculate the quota as for normal JobSet (e.g. admin the JobSet if all replicated Jobs resources are available). However, it would be nice to admin/suspend steps in Job sequence separately. But I feel that we should discuss it after we integrate Argo Workflow into Kueue: https://github.com/kubernetes-sigs/kueue/pull/2976 @tenzen-y Any thoughts ?
@ahg-g @danielvegamyhre Do we need a small KEP for it or we can jump directly to the implementation ?
We should have a small KEP for this.
I think, in the first iteration we should calculate the quota as for normal JobSet (e.g. admin the JobSet if all replicated Jobs resources are available). However, it would be nice to admin/suspend steps in Job sequence separately.
I agree that we can leave implementation details for a second iteration. However, there are too many moving pieces here to completely ignore the question until later. We at least need a high level plan of how it's going to work. Users will be very surprised that Kueue needs to reserve space for all of the steps of the sequence.
But I feel that we should discuss it after we integrate Argo Workflow into Kueue:
Not really. We need to figure out if the proposed design for Argo will also work for Jobset. I don't think it currently does, as the proposal relies on the pod group integration, which wouldn't be ideal for jobset.
/assign @andreyvelich