[Bug] KubeRay keep update RayJob CR when external Finalizer added to it.
Search before asking
- [X] I searched the issues and found no similar issues.
KubeRay Component
ray-operator
What happened + What you expected to happen
I'm trying to have my controller watch RayJob CR. I added another finalizer to RayJob CR, so after deleting RayJob, my controller can still observe the status of RayJob. But when I try to remove my finalizer from RayJob CR, I kept receiving conflict error: error":"Operation cannot be fulfilled on rayjobs.ray.io "mlrayjob-1-ray-job": the object has been modified; please apply your changes to the latest version and try again"
I check Kuberay's log:
2023-11-05T03:56:59.654Z INFO controllers.RayJob reconciling RayJob {"NamespacedName": "mlplatform-customtask/mlrayjob-1-ray-job"}
2023-11-05T03:56:59.654Z INFO controllers.RayJob RayJob is being deleted {"DeletionTimestamp": "2023-11-05 03:40:40 +0000 UTC"}
2023-11-05T03:56:59.654Z INFO controllers.RayJob Remove the finalizer no matter StopJob() succeeds or not. {"finalizer": "ray.io/rayjob-finalizer"}
2023-11-05T03:57:02.671Z INFO controllers.RayJob reconciling RayJob {"NamespacedName": "mlplatform-customtask/mlrayjob-1-ray-job"}
2023-11-05T03:57:02.672Z INFO controllers.RayJob RayJob is being deleted {"DeletionTimestamp": "2023-11-05 03:40:40 +0000 UTC"}
2023-11-05T03:57:02.672Z INFO controllers.RayJob Remove the finalizer no matter StopJob() succeeds or not. {"finalizer": "ray.io/rayjob-finalizer"}
2023-11-05T03:57:05.747Z INFO controllers.RayJob reconciling RayJob {"NamespacedName": "mlplatform-customtask/mlrayjob-1-ray-job"}
2023-11-05T03:57:05.747Z INFO controllers.RayJob RayJob is being deleted {"DeletionTimestamp": "2023-11-05 03:40:40 +0000 UTC"}
2023-11-05T03:57:05.747Z INFO controllers.RayJob Remove the finalizer no matter StopJob() succeeds or not. {"finalizer": "ray.io/rayjob-finalizer"}
2023-11-05T03:57:08.769Z INFO controllers.RayJob reconciling RayJob {"NamespacedName": "mlplatform-customtask/mlrayjob-1-ray-job"}
2023-11-05T03:57:08.769Z INFO controllers.RayJob RayJob is being deleted {"DeletionTimestamp": "2023-11-05 03:40:40 +0000 UTC"}
2023-11-05T03:57:08.769Z INFO controllers.RayJob Remove the finalizer no matter StopJob() succeeds or not. {"finalizer": "ray.io/rayjob-finalizer"}
2023-11-05T03:57:11.790Z INFO controllers.RayJob reconciling RayJob {"NamespacedName": "mlplatform-customtask/mlrayjob-1-ray-job"}
2023-11-05T03:57:11.791Z INFO controllers.RayJob RayJob is being deleted {"DeletionTimestamp": "2023-11-05 03:40:40 +0000 UTC"}
2023-11-05T03:57:11.791Z INFO controllers.RayJob Remove the finalizer no matter StopJob() succeeds or not. {"finalizer": "ray.io/rayjob-finalizer"}
as you can see, kuberay controller keep doing reconcile, updating the RayJob CR, even there is no changes. That will change the resourceVersion and other controller have to patch the cr to remove other finalizer, or they will have conflict error.
Here is the related code:
r.Log.Info("Remove the finalizer no matter StopJob() succeeds or not.", "finalizer", common.RayJobStopJobFinalizer)
controllerutil.RemoveFinalizer(rayJobInstance, common.RayJobStopJobFinalizer)
err := r.Update(ctx, rayJobInstance)
if err != nil {
r.Log.Error(err, "Failed to remove finalizer for RayJob")
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
Maybe we can make a diff operation, so controller won't update resource when finalizer have been removed.
Reproduction script
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-sample
finalizers:
- customtask.xxx.xxx/finalizers
spec:
entrypoint: python /home/ray/samples/sample_code.py
rayClusterSpec:
rayVersion: '2.7.0'
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
template:
spec:
containers:
- name: ray-head
image: rayproject/ray-ml:2.7.0-gpu
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
nvidia.com/gpu: "4"
cpu: "54"
memory: "54Gi"
requests:
nvidia.com/gpu: "4"
cpu: "54"
memory: "54Gi"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-tesla-t4
volumes:
- name: code-sample
configMap:
name: ray-job-code-sample
items:
- key: sample_code.py
path: sample_code.py
######################Ray code #################################
# This sample is from https://docs.ray.io/en/latest/data/examples/huggingface_vit_batch_prediction.html
# It is mounted into the container.
---
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
s3_uri = "s3://anonymous@air-example-data-2/imagenette2/val/"
ds = ray.data.read_images(
s3_uri, mode="RGB"
)
ds
from typing import Dict
import numpy as np
from transformers import pipeline
from PIL import Image
BATCH_SIZE = 16
class ImageClassifier:
def __init__(self):
# If doing CPU inference, set `device="cpu"` instead.
self.classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device=0)
def __call__(self, batch: Dict[str, np.ndarray]):
# Convert the numpy array of images into a list of PIL images which is the format the HF pipeline expects.
outputs = self.classifier(
[Image.fromarray(image_array) for image_array in batch["image"]],
top_k=1,
batch_size=BATCH_SIZE)
# `outputs` is a list of length-one lists. For example:
# [[{'score': '...', 'label': '...'}], ..., [{'score': '...', 'label': '...'}]]
batch["score"] = [output[0]["score"] for output in outputs]
batch["label"] = [output[0]["label"] for output in outputs]
return batch
predictions = ds.map_batches(
ImageClassifier,
compute=ray.data.ActorPoolStrategy(size=4), # Change this number based on the number of GPUs in your cluster.
num_gpus=1, # Specify 1 GPU per model replica.
batch_size=BATCH_SIZE # Use the largest batch size that can fit on our GPUs
)
prediction_batch = predictions.take_batch(5)
from PIL import Image
print("A few sample predictions: ")
for image, prediction in zip(prediction_batch["image"], prediction_batch["label"]):
img = Image.fromarray(image)
# Display the image
img.show()
print("Label: ", prediction)
# Write to local disk, or external storage, e.g. S3
# ds.write_parquet("s3://my_bucket/my_folder")
Anything else
No response
Are you willing to submit a PR?
- [X] Yes I am willing to submit a PR!