argo-python-dsl icon indicating copy to clipboard operation
argo-python-dsl copied to clipboard

submit failed, prompt "argo.workflows.client.rest.ApiException: (400)"

Open xxlest opened this issue 5 years ago • 6 comments

Describe the bug i tried submit argo workflow use python which is "hello-word" dsl, but failed Could help me have a look? python code:

from argo.workflows.dsl import Workflow from argo.workflows.dsl import template import yaml

from argo.workflows.dsl.templates import V1Container

class HelloWorld(Workflow):

entrypoint = "whalesay"

@template
def whalesay(self) -> V1Container:
    container = V1Container(
        image="docker/whalesay:latest",
        name="whalesay",
        command=["cowsay"],
        args=["hello world"]
    )

    return container

wf=HelloWorld() print(wf)

from argo.workflows.client import V1alpha1Api from argo.workflows.config import load_kube_config

load_kube_config() # loads local configuration from ~/.kube/config v1alpha1 = V1alpha1Api() wfs = v1alpha1.list_namespaced_workflows(namespace="default") print(wfs) #v1alpha1.create_namespaced_workflow("default", wf) wf.submit(client=V1alpha1Api(), namespace="default")

i can get wfs by client and print, but submit failed throw 400 exception Screenshots image image

xxlest avatar May 30 '20 10:05 xxlest

I have the same error using Minikube.

hadim avatar Jun 03 '20 20:06 hadim

This small hack fixed it for me:

# Current hack to make it work: should be fixed soon
manifest = wk.to_dict()
manifest["apiVersion"] = "argoproj.io/v1alpha1"
manifest["kind"] = "Workflow"

hadim avatar Jun 03 '20 20:06 hadim

This small hack fixed it for me:

# Current hack to make it work: should be fixed soon
manifest = wk.to_dict()
manifest["apiVersion"] = "argoproj.io/v1alpha1"
manifest["kind"] = "Workflow"

could you show me full code? i can't get how to submit by python dsl

xxlest avatar Jun 04 '20 03:06 xxlest

Here it is:

import secrets

from argo.workflows.client import V1alpha1Api
from argo.workflows.config import load_kube_config

from argo.workflows.dsl import Workflow
from argo.workflows.dsl.tasks import task, dependencies
from argo.workflows.dsl.templates import parameter
from argo.workflows.dsl.templates import inputs
from argo.workflows.dsl.templates import template
from argo.workflows.dsl.templates import V1alpha1Parameter
from argo.workflows.dsl.templates import V1alpha1Template
from argo.workflows.dsl.templates import V1Container


# Define your workflow
class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container
    
# Create a workflow
wk = DagDiamond()

# Load the API to communicate with the cluster
load_kube_config()
v1alpha1 = V1alpha1Api()

# Current hack to make it work: should be fixed soon
manifest = wk.to_dict()
manifest["apiVersion"] = "argoproj.io/v1alpha1"
manifest["kind"] = "Workflow"
manifest["metadata"]["name"] += secrets.token_hex(5)
manifest["spec"]["serviceAccountName"] = "argo"

# Submit the workflow to the cluster
sub_wk = v1alpha1.create_namespaced_workflow(namespace="argo", body=manifest)

# List workflows (executed or not)
wfs = v1alpha1.list_namespaced_workflows(namespace="argo")
print(f"{len(wfs.items)} workflows on the cluster.")

hadim avatar Jun 04 '20 10:06 hadim

Here it is:

import secrets

from argo.workflows.client import V1alpha1Api
from argo.workflows.config import load_kube_config

from argo.workflows.dsl import Workflow
from argo.workflows.dsl.tasks import task, dependencies
from argo.workflows.dsl.templates import parameter
from argo.workflows.dsl.templates import inputs
from argo.workflows.dsl.templates import template
from argo.workflows.dsl.templates import V1alpha1Parameter
from argo.workflows.dsl.templates import V1alpha1Template
from argo.workflows.dsl.templates import V1Container


# Define your workflow
class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container
    
# Create a workflow
wk = DagDiamond()

# Load the API to communicate with the cluster
load_kube_config()
v1alpha1 = V1alpha1Api()

# Current hack to make it work: should be fixed soon
manifest = wk.to_dict()
manifest["apiVersion"] = "argoproj.io/v1alpha1"
manifest["kind"] = "Workflow"
manifest["metadata"]["name"] += secrets.token_hex(5)
manifest["spec"]["serviceAccountName"] = "argo"

# Submit the workflow to the cluster
sub_wk = v1alpha1.create_namespaced_workflow(namespace="argo", body=manifest)

# List workflows (executed or not)
wfs = v1alpha1.list_namespaced_workflows(namespace="argo")
print(f"{len(wfs.items)} workflows on the cluster.")

thanks very much, i got sucess also

xxlest avatar Jun 06 '20 04:06 xxlest

should probably be solved here https://github.com/argoproj-labs/argo-python-dsl

Also pypi argo-python-dsl now points to the community maintained release

binarycrayon avatar Oct 11 '20 20:10 binarycrayon