kuberay icon indicating copy to clipboard operation
kuberay copied to clipboard

[Bug] KubeRay keep update RayJob CR when external Finalizer added to it.

Open Irvingwangjr opened this issue 2 years ago • 0 comments

Search before asking

  • [X] I searched the issues and found no similar issues.

KubeRay Component

ray-operator

What happened + What you expected to happen

I'm trying to have my controller watch RayJob CR. I added another finalizer to RayJob CR, so after deleting RayJob, my controller can still observe the status of RayJob. But when I try to remove my finalizer from RayJob CR, I kept receiving conflict error: error":"Operation cannot be fulfilled on rayjobs.ray.io "mlrayjob-1-ray-job": the object has been modified; please apply your changes to the latest version and try again"

I check Kuberay's log:

2023-11-05T03:56:59.654Z	INFO	controllers.RayJob	reconciling RayJob	{"NamespacedName": "mlplatform-customtask/mlrayjob-1-ray-job"}
2023-11-05T03:56:59.654Z	INFO	controllers.RayJob	RayJob is being deleted	{"DeletionTimestamp": "2023-11-05 03:40:40 +0000 UTC"}
2023-11-05T03:56:59.654Z	INFO	controllers.RayJob	Remove the finalizer no matter StopJob() succeeds or not.	{"finalizer": "ray.io/rayjob-finalizer"}
2023-11-05T03:57:02.671Z	INFO	controllers.RayJob	reconciling RayJob	{"NamespacedName": "mlplatform-customtask/mlrayjob-1-ray-job"}
2023-11-05T03:57:02.672Z	INFO	controllers.RayJob	RayJob is being deleted	{"DeletionTimestamp": "2023-11-05 03:40:40 +0000 UTC"}
2023-11-05T03:57:02.672Z	INFO	controllers.RayJob	Remove the finalizer no matter StopJob() succeeds or not.	{"finalizer": "ray.io/rayjob-finalizer"}
2023-11-05T03:57:05.747Z	INFO	controllers.RayJob	reconciling RayJob	{"NamespacedName": "mlplatform-customtask/mlrayjob-1-ray-job"}
2023-11-05T03:57:05.747Z	INFO	controllers.RayJob	RayJob is being deleted	{"DeletionTimestamp": "2023-11-05 03:40:40 +0000 UTC"}
2023-11-05T03:57:05.747Z	INFO	controllers.RayJob	Remove the finalizer no matter StopJob() succeeds or not.	{"finalizer": "ray.io/rayjob-finalizer"}
2023-11-05T03:57:08.769Z	INFO	controllers.RayJob	reconciling RayJob	{"NamespacedName": "mlplatform-customtask/mlrayjob-1-ray-job"}
2023-11-05T03:57:08.769Z	INFO	controllers.RayJob	RayJob is being deleted	{"DeletionTimestamp": "2023-11-05 03:40:40 +0000 UTC"}
2023-11-05T03:57:08.769Z	INFO	controllers.RayJob	Remove the finalizer no matter StopJob() succeeds or not.	{"finalizer": "ray.io/rayjob-finalizer"}
2023-11-05T03:57:11.790Z	INFO	controllers.RayJob	reconciling RayJob	{"NamespacedName": "mlplatform-customtask/mlrayjob-1-ray-job"}
2023-11-05T03:57:11.791Z	INFO	controllers.RayJob	RayJob is being deleted	{"DeletionTimestamp": "2023-11-05 03:40:40 +0000 UTC"}
2023-11-05T03:57:11.791Z	INFO	controllers.RayJob	Remove the finalizer no matter StopJob() succeeds or not.	{"finalizer": "ray.io/rayjob-finalizer"}

as you can see, kuberay controller keep doing reconcile, updating the RayJob CR, even there is no changes. That will change the resourceVersion and other controller have to patch the cr to remove other finalizer, or they will have conflict error.

Here is the related code:

		r.Log.Info("Remove the finalizer no matter StopJob() succeeds or not.", "finalizer", common.RayJobStopJobFinalizer)
		controllerutil.RemoveFinalizer(rayJobInstance, common.RayJobStopJobFinalizer)
		err := r.Update(ctx, rayJobInstance)
		if err != nil {
			r.Log.Error(err, "Failed to remove finalizer for RayJob")
			return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
		}

Maybe we can make a diff operation, so controller won't update resource when finalizer have been removed.

Reproduction script

apiVersion: ray.io/v1
kind: RayJob
metadata:
  name: rayjob-sample
  finalizers:
  - customtask.xxx.xxx/finalizers
spec:
  entrypoint: python /home/ray/samples/sample_code.py
  rayClusterSpec:
    rayVersion: '2.7.0'
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray-ml:2.7.0-gpu
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                limits:
                  nvidia.com/gpu: "4"
                  cpu: "54"
                  memory: "54Gi"
                requests:
                  nvidia.com/gpu: "4"
                  cpu: "54"
                  memory: "54Gi"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: code-sample
          nodeSelector:
            cloud.google.com/gke-accelerator: nvidia-tesla-t4
          volumes:
            - name: code-sample
              configMap:
                name: ray-job-code-sample
                items:
                  - key: sample_code.py
                    path: sample_code.py

######################Ray code #################################
# This sample is from https://docs.ray.io/en/latest/data/examples/huggingface_vit_batch_prediction.html
# It is mounted into the container.
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
data:
  sample_code.py: |
    import ray

    s3_uri = "s3://anonymous@air-example-data-2/imagenette2/val/"

    ds = ray.data.read_images(
        s3_uri, mode="RGB"
    )
    ds
    from typing import Dict
    import numpy as np

    from transformers import pipeline
    from PIL import Image

    BATCH_SIZE = 16

    class ImageClassifier:
        def __init__(self):
            # If doing CPU inference, set `device="cpu"` instead.
            self.classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device=0)

        def __call__(self, batch: Dict[str, np.ndarray]):
            # Convert the numpy array of images into a list of PIL images which is the format the HF pipeline expects.
            outputs = self.classifier(
                [Image.fromarray(image_array) for image_array in batch["image"]], 
                top_k=1, 
                batch_size=BATCH_SIZE)
            
            # `outputs` is a list of length-one lists. For example:
            # [[{'score': '...', 'label': '...'}], ..., [{'score': '...', 'label': '...'}]]
            batch["score"] = [output[0]["score"] for output in outputs]
            batch["label"] = [output[0]["label"] for output in outputs]
            return batch

    predictions = ds.map_batches(
        ImageClassifier,
        compute=ray.data.ActorPoolStrategy(size=4), # Change this number based on the number of GPUs in your cluster.
        num_gpus=1, # Specify 1 GPU per model replica.
        batch_size=BATCH_SIZE # Use the largest batch size that can fit on our GPUs
    )

    prediction_batch = predictions.take_batch(5)

    from PIL import Image
    print("A few sample predictions: ")
    for image, prediction in zip(prediction_batch["image"], prediction_batch["label"]):
        img = Image.fromarray(image)
        # Display the image
        img.show()
        print("Label: ", prediction)

    # Write to local disk, or external storage, e.g. S3
    # ds.write_parquet("s3://my_bucket/my_folder")

Anything else

No response

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Irvingwangjr avatar Nov 05 '23 04:11 Irvingwangjr