codeflare-sdk icon indicating copy to clipboard operation
codeflare-sdk copied to clipboard

Running KFP with Codeflare SDK

Open blublinsky opened this issue 2 years ago • 2 comments

We finally made it work, but I do not think it is sustainable for the wider population. Here is what we have to do:

  1. Because Codeflare SDK relies on OC, we had to create a new image for KFP execution
FROM python:3.8.16-slim-bullseye

RUN apt update && apt install -y wget
# install oc
RUN mkdir /opt/oc
RUN wget -O /opt/oc/release.tar.gz  https://mirror.openshift.com/pub/openshift-v4/x86_64/clients/ocp/stable-4.11/openshift-client-linux-4.11.40.tar.gz
RUN tar -xzvf  /opt/oc/release.tar.gz -C /opt/oc/ && \
    mv /opt/oc/oc /usr/bin/ && \
    rm -rf /opt/oc

# install libraries
RUN pip install --upgrade pip && pip install codeflare-sdk
RUN pip install "ray[default]"==2.1.0

# Allow writes for yaml files
RUN chmod -R 777 /tmp

Note here that we also had to create a writable directory for saving intermediate YAML 2. Because Codeflare SDK directly manipulates MCAD, RAYCluster, and OpenShift Routes resources, we had to add additional permission to pipeline-runner service account, which should eventually go to KFDef. Here are the files:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: kfp-openshift-route
rules:
  - apiGroups: ["route.openshift.io"]
    resources: ["routes", "routes/custom-host"]
    verbs:  ["create", "get", "list", "patch", "delete"]
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: pipeline-runner-binding-mcad
  namespace: odh-applications
subjects:
  - kind: ServiceAccount
    name: pipeline-runner
    namespace: odh-applications
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: mcad-mcad-controller-role
  ---
  kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: pipeline-runner-binding-ray
  namespace: odh-applications
subjects:
  - kind: ServiceAccount
    name: pipeline-runner
    namespace: odh-applications
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: mcad-controller-ray-clusterrole
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: pipeline-runner-binding-route
  namespace: odh-applications
subjects:
  - kind: ServiceAccount
    name: pipeline-runner
    namespace: odh-applications
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: kfp-openshift-route
  1. To make sure, that we have permission to write files, we had to redirect the working directory. He is the complete code:
import kfp.components as comp
from kfp_tekton.compiler import TektonCompiler
import kfp.dsl as dsl
from kubernetes import client as k8s_client

# execute ray pipeline
def execure_ray_pipeline(token: str,                 # token to authenticate to cluster
                         name: str,                  # name of Ray cluster
                         min_worker: str,            # min number of workers
                         max_worker: str,            # max number of workers
                         min_cpus: str,              # min cpus per worker
                         max_cpus: str,              # max cpus per worker
                         min_memory: str,            # min memory per worker
                         max_memory: str,            # max memory per worker
                         image: str = "ghcr.io/foundation-model-stack/base:ray2.1.0-py38-gpu-pytorch1.12.0cu116-20221213-193103"
                      ):
    # Ray code - basically hello world
    import ray
    @ray.remote
    class Counter:
        def __init__(self):
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return self.counter

    # Import pieces from codeflare-sdk
    from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration, list_all_clusters, list_all_queued
    from codeflare_sdk.cluster.auth import TokenAuthentication
    import os

    # get current namespace
    ns = os.getenv('NAMESPACE', 'default')
    # change the current directory to ensure that we can write
    os.chdir("/tmp")

    print(f"Executing in namespace {ns}, current working directory is {os.getcwd()}")

    # Create authentication object for oc user permissions
    auth = TokenAuthentication(
        token=token,
        server="https://kubernetes.default:443",
        skip_tls=True
    )
    try:
        auth.login()
    except Exception as e:
        print(f"Failed to log into openshift cluster, error {e}. Please check token/server values provided")
        os.abort()
    print("successfully logged in")
    # Create and configure our cluster object (and appwrapper)
    cluster = Cluster(ClusterConfiguration(
        name = name,
        namespace = ns,
        min_worker = int(min_worker),
        max_worker = int(max_worker),
        min_cpus = int(min_cpus),
        max_cpus = int(max_cpus),
        min_memory = int(min_memory),
        max_memory = int(max_memory),
        gpu=0,
        image = image,
        instascale=False
    ))
    print(f"Configuration for Ray cluster {name} in namespace {ns} is created")

    # bring up the cluster
    cluster.up()
    print(f"Creating Ray cluster {name} in namespace {ns}...")

    # and wait for it being up
    cluster.wait_ready()
    rc = cluster.details(print_to_console=False)
    print("Ray cluster is ready")
    print(rc)

    # Get cluster connection points
    ray_cluster_uri = cluster.cluster_uri()
    print(f"Ray_cluster is at {ray_cluster_uri}")

    # Connect to the cluster
    try:
        ray.init(address=f'{ray_cluster_uri}', ignore_reinit_error=True)
    except Exception as e:
        print(f"Failed to connect to Ray cluster, error {e}")
        os.abort()
    print("connected to Ray cluster")

    # execute Ray function
    print("Running Hello world")
    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))

    # delete cluster
    print("All done. Cleaning up")
    cluster.down()

# components
ray_pipiline_op = comp.func_to_container_op(
    func=execure_ray_pipeline,
    base_image="blublinsky1/kfp-oc:0.0.2"
)

# Pipeline to invoke execution on remote resource
@dsl.pipeline(
    name='simple-ray-pipeline',
    description='Pipeline to show how to use codeflare sdk to create Ray cluster and run jobs'
)
def simple_ray_pipeline(token: str,                 # token to authenticate to cluster
                        name: str = "kfp-ray",      # name of Ray cluster
                        min_worker: str = "2",      # min number of workers
                        max_worker: str = "2",      # max number of workers
                        min_cpus: str = "2",        # min cpus per worker
                        max_cpus: str = "2",        # max cpus per worker
                        min_memory: str = "4",      # min memory per worker
                        max_memory: str = "4",      # max memory per worker
                        image: str = "ghcr.io/foundation-model-stack/base:ray2.1.0-py38-gpu-pytorch1.12.0cu116-20221213-193103"
                        ):

    # invoke pipeline
    pipeline = ray_pipiline_op(token, name, min_worker, max_worker, min_cpus,max_cpus, min_memory,
                               max_memory, image)
    pipeline.execution_options.caching_strategy.max_cache_staleness = "P0D"
    pipeline.add_env_variable(k8s_client.V1EnvVar(
        name='NAMESPACE',
        value_from=k8s_client.V1EnvVarSource(
            field_ref=k8s_client.V1ObjectFieldSelector(field_path="metadata.namespace"))))


if __name__ == '__main__':
    # Compiling the pipeline

    TektonCompiler().compile(simple_ray_pipeline, __file__.replace('.py', '.yaml'))

After all this, the workflow works correctly. Need to also add an implementation based on https://docs.ray.io/en/latest/cluster/running-applications/job-submission/rest.html

blublinsky avatar May 26 '23 14:05 blublinsky

Since a component is already running in the cluster, do we still need TokenAuthentication? Can one invoke load_incluster_config instead?

from kubernetes import client, config
loadedconf = config.load_incluster_config()

If TokenAuthentication is required, how does one update a token in a scheduled operational environment? Does it need to be compiled in also?

# Create authentication object for oc user permissions
    auth = TokenAuthentication(
        token=token,
        server="https://kubernetes.default:443",
        skip_tls=True
    )
    try:
        auth.login()

yuanchi2807 avatar May 31 '23 15:05 yuanchi2807

The following works:

# Create authentication object for oc user permissions
    with open("/var/run/secrets/kubernetes.io/serviceaccount/token", "r") as file:
        token = file.read().rstrip()
    auth = TokenAuthentication(token=token, server="https://kubernetes.default:443", skip_tls=True)
    try:
        auth.login()

roytman avatar Jun 24 '23 19:06 roytman