training-operator
training-operator copied to clipboard
The Job condition's transition is not clear and has bugs
There are 5 JobConditionType( Created, Running, Restarting, Succeeded, Failed) now, it is like a state machine. The flow of these Condition as roughly shown below, and there are some questions we need to make it clear. For example,
- Can Job from Created to Failed directly?
- Does Job must have Created Condition? What if Job's spec failed validation?
- ......
|------>Succeeded
|
Created----->Running------
| ⬆️ |
| | |------->Failed
| |
| |
⬇️ |
Restarting
Besides, users always want to know the condition of state transition(Yes, it is the state transition table). We also need to make the state transition table clear. It can help users to understand what's going on and help developers to guide how to write correctly codes.
| Workload | Create/Restarting -> Running | Running -> Restarting | Running -> Failed | Running -> Succeeded |
|---|---|---|---|---|
| MPIJob | xxx | yyy | zzz | kkkk |
| PytorchJob | xxx | yyy | zzz | kkk |
| TFJob | xxx | yyy | zzz | kkk |
Why this is important by real cases: using the following yaml to create a PytorchJob(the image of Worker is wrong), then the worker pod will not running at all, but the PytorchJob will be JobRunning condition.
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-test-condition
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: alpine:latest
command: ["sleep", "365d"]
Worker:
replicas: 2
template:
spec:
containers:
- name: pytorch
image: alpine:latestxx
command: ["sleep", "365d"]

Referring to point3 of https://github.com/kubeflow/training-operator/issues/1703
AFAIK, It was a conscious decision to mark job running when at least one pod is in running state.
Others can add /cc @kubeflow/wg-training-leads @gaocegege @zw0610
First of all, this discussion is long overdue and quite important.
Here are my thoughts:
- Instead of defining possible transitions from one condition to another, why don't we just define the meaning of conditions of
Created,Running, etc. In this way, questions like Can Job from Created to Failed directly? can be answered more clearly. - For
Succeeded,RunningandFailedconditions, different jobs may have different standards, especially when elastic mode is introduced. But forCreatedandRestarting, jobs shall share a universal definition. - Regarding the PyTorchJob condition problem, the answer depends on how we define
Running. IfRunningis defined asas long as at least one Pod is in Running status, then the contemporary job condition is correct. IfRunningis defined asall desired Pod is in Running status, then the condition in the screenshot is wrong.
@zw0610 👍 I agree with you. We should define the meaning of conditions of Created, Running clear firstly.
I use the following pytorch code to do test
import argparse
import os
import torch
import timeit
import numpy as np
import torch.distributed as dist
import torch.optim as optim
import torch.nn.functional as F
from torchvision import models
from torch.nn.parallel import DistributedDataParallel as DDP
def spmd_main(args, data, target):
# (1) call dist.init_process_group()
# These are the parameters used to initialize the process group
env_dict = {
key: os.environ[key]
for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE", "LOCAL_RANK")
}
print(f"[{os.getpid()}] Initializing process group with: {env_dict}")
dist.init_process_group(backend="nccl")
print(
f"[{os.getpid()}]: world_size = {dist.get_world_size()}, "
+ f"local_rank = {args.local_rank}, "
+ f"rank = {dist.get_rank()}, backend={dist.get_backend()} \n", end=''
)
# (2) get local_rank...
if 'LOCAL_RANK' in os.environ:
local_rank = int(os.environ["LOCAL_RANK"])
log('LOCAL_RANK env is not none, use env, the local_rank is %d' % local_rank)
else:
local_rank = args.local_rank
log('LOCAL_RANK env is none, use args, the local_rank is %d' % local_rank)
world_size = dist.get_world_size()
rank = dist.get_rank()
# (3) construct model,ddp_model, optimizer
model = getattr(models, 'resnet50')().to(local_rank)
ddp_model = DDP(model, device_ids=[local_rank],
output_device=local_rank)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
target = target.to(local_rank)
# (4) call benchmark_step for warm up
time = timeit.timeit(lambda: benchmark_step(ddp_model, optimizer, data, target),
number=args.num_batches_per_iter)
# (5) benchmark testing
img_secs = []
for x in range(args.num_iters):
time = timeit.timeit(lambda: benchmark_step(ddp_model, optimizer, data, target),
number=args.num_batches_per_iter)
img_sec = args.batch_size * args.num_batches_per_iter / time
log('Iter #%d, rank %s: %.1f img/sec per %s' % (x, rank, img_sec, 'gpu'))
img_secs.append(img_sec)
# (6) caculate the result of step(5)
img_sec_mean = np.mean(img_secs)
img_sec_conf = 1.96 * np.std(img_secs)
log('Img/sec per %s: %.1f +-%.1f' % ('gpu', img_sec_mean, img_sec_conf))
log('Total img/sec on %d %s(s): %.1f +-%.1f'
% (world_size, 'gpu', world_size * img_sec_mean,
world_size * img_sec_conf))
# (7) destroy the process group
dist.destroy_process_group()
def benchmark_step(ddp_model, optimizer, data, target):
optimizer.zero_grad()
output = ddp_model(data)
loss = F.cross_entropy(output, target)
loss.backward()
optimizer.step()
def log(s, nl=True):
print(s, end='\n' if nl else '')
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='PyTorch Synthetic Benchmark',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--local_rank", type=int, default=0)
parser.add_argument("--local_world_size", type=int, default=1)
parser.add_argument('--batch-size', type=int, default=32,
help='input batch size')
parser.add_argument('--num-iters', type=int, default=10,
help='number of benchmark iterations')
parser.add_argument('--num-batches-per-iter', type=int, default=10,
help='number of batches per benchmark iteration')
args = parser.parse_args()
data = torch.randn(args.batch_size, 3, 224, 224)
target = torch.LongTensor(args.batch_size).random_() % 1000
# The main entry point is called directly without using subprocess
spmd_main(args, data, target)
This two pictures are about a pytorchjob with 1 master and 2 workers, the image ispytorch/pytorch:1.8.1-cuda11.1-cudnn8-runtime, and all pod's command is sleep 365d. Then I exec into each podkubectl exec -it xxx -- bash. Then I did as the pictures show.
I think I can get a conclusion, in pytorch 1.8.1, pytorchjob only get actually training when all pods/processes are running. And when there is any pod/process failed, the training will stop.


This two pictures are about a pytorchjob with 1 master and 2 workers, the image is pytorch/pytorch:1.12.1-cuda11.3-cudnn8-runtime, and all pod's command is sleep 365d. Then I exec into each pod kubectl exec -it xxx -- bash. Then I did as the pictures show.
I think I can get a conclusion, in pytorch 1.12.1, in not elastic mode, pytorchjob only get actually training when all pods/processes are running. And when there is any pod/process failed, the training will stop. It is like in pytorch 1.8.1


This pictures are about a pytorchjob with 1 master and 2 workers, the image is pytorch/pytorch:1.12.1-cuda11.3-cudnn8-runtime, and all pod's command is sleep 365d. Then I exec into each pod kubectl exec -it xxx -- bash. Then I did as the picture show.
I think I can get a conclusion, in pytorch 1.12.1, in elastic mode, pytorchjob cat get actually training when meet the min pod/node requirement. And one pod failed will not lead task failed.

Besides above test, I did more other tests. And I think my test can get the following conclusion.
@johnugeorge @zw0610 what do you think?
| Workload | Create | Running | Restarting | Failed | Succeeded |
|---|---|---|---|---|---|
| PytorchJob | xxx | In not elastic mode, when all pods are running. In elastic mode, when running pod meet the min --nnodes requirement |
yyy | In not elastic mode, when any one of pod failed. In elastic mode, when task restart time meet the requirement of --max_restarts, or all pods are failed (in pytorch1.12.1, when I set --max_restarts=1, after worker0, worker 1 failed, it fulfills the --max_restarts=1 requirement, then the job should be consider failed, but master pod/process does not exit, it blocks, this is hard for operator to detect whether job is failed) |
In not elastic mode, when master pod successful complete. In elastic mode, when any of pod successful complete |
Thanks @HeGaoYuan for the tests
From the list, I see that these items are missing now
- Elastic mode check is not added now.
- For running, we are not considering all pods.
Anything else ?
Thanks @HeGaoYuan for the tests
From the list, I see that these items are missing now
- Elastic mode check is not added now.
- For running, we are not considering all pods.
Anything else ?
Yes, you are right, nothing more for now.
But I think the most important is we discuss the definition of status as @zw0610 said, make the definition correctly and clearly. Then we could revise the code to follow the definition.
The list I have written maybe not 100% correct. For example, for pytorchjob, could the definition of Succeed status just be In both elastic and non elastic mode, when the master pod successful complete ?
And I missed the definition of Create and Restarting.
For the definition of Create status, I think the most important thing is what should we do when Job validation failed. I think there maybe 3 solution about it, as the pictures shows. For me, I prefer a or c solution. BTW, the https://github.com/kubeflow/training-operator/pull/1705 is also related here
For the definition of Restarting status, I think it is very hard to give a definition. I need more time to say it clearly.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
/remove-lifecycle stale
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.