argo-python-dsl
argo-python-dsl copied to clipboard
submit failed, prompt "argo.workflows.client.rest.ApiException: (400)"
Describe the bug i tried submit argo workflow use python which is "hello-word" dsl, but failed Could help me have a look? python code:
from argo.workflows.dsl import Workflow from argo.workflows.dsl import template import yaml
from argo.workflows.dsl.templates import V1Container
class HelloWorld(Workflow):
entrypoint = "whalesay"
@template
def whalesay(self) -> V1Container:
container = V1Container(
image="docker/whalesay:latest",
name="whalesay",
command=["cowsay"],
args=["hello world"]
)
return container
wf=HelloWorld() print(wf)
from argo.workflows.client import V1alpha1Api from argo.workflows.config import load_kube_config
load_kube_config() # loads local configuration from ~/.kube/config v1alpha1 = V1alpha1Api() wfs = v1alpha1.list_namespaced_workflows(namespace="default") print(wfs) #v1alpha1.create_namespaced_workflow("default", wf) wf.submit(client=V1alpha1Api(), namespace="default")
i can get wfs by client and print, but submit failed throw 400 exception
Screenshots

I have the same error using Minikube.
This small hack fixed it for me:
# Current hack to make it work: should be fixed soon
manifest = wk.to_dict()
manifest["apiVersion"] = "argoproj.io/v1alpha1"
manifest["kind"] = "Workflow"
This small hack fixed it for me:
# Current hack to make it work: should be fixed soon manifest = wk.to_dict() manifest["apiVersion"] = "argoproj.io/v1alpha1" manifest["kind"] = "Workflow"
could you show me full code? i can't get how to submit by python dsl
Here it is:
import secrets
from argo.workflows.client import V1alpha1Api
from argo.workflows.config import load_kube_config
from argo.workflows.dsl import Workflow
from argo.workflows.dsl.tasks import task, dependencies
from argo.workflows.dsl.templates import parameter
from argo.workflows.dsl.templates import inputs
from argo.workflows.dsl.templates import template
from argo.workflows.dsl.templates import V1alpha1Parameter
from argo.workflows.dsl.templates import V1alpha1Template
from argo.workflows.dsl.templates import V1Container
# Define your workflow
class DagDiamond(Workflow):
@task
@parameter(name="message", value="A")
def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="B")
@dependencies(["A"])
def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="C")
@dependencies(["A"])
def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="D")
@dependencies(["B", "C"])
def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@template
@inputs.parameter(name="message")
def echo(self, message: V1alpha1Parameter) -> V1Container:
container = V1Container(
image="alpine:3.7",
name="echo",
command=["echo", "{{inputs.parameters.message}}"],
)
return container
# Create a workflow
wk = DagDiamond()
# Load the API to communicate with the cluster
load_kube_config()
v1alpha1 = V1alpha1Api()
# Current hack to make it work: should be fixed soon
manifest = wk.to_dict()
manifest["apiVersion"] = "argoproj.io/v1alpha1"
manifest["kind"] = "Workflow"
manifest["metadata"]["name"] += secrets.token_hex(5)
manifest["spec"]["serviceAccountName"] = "argo"
# Submit the workflow to the cluster
sub_wk = v1alpha1.create_namespaced_workflow(namespace="argo", body=manifest)
# List workflows (executed or not)
wfs = v1alpha1.list_namespaced_workflows(namespace="argo")
print(f"{len(wfs.items)} workflows on the cluster.")
Here it is:
import secrets from argo.workflows.client import V1alpha1Api from argo.workflows.config import load_kube_config from argo.workflows.dsl import Workflow from argo.workflows.dsl.tasks import task, dependencies from argo.workflows.dsl.templates import parameter from argo.workflows.dsl.templates import inputs from argo.workflows.dsl.templates import template from argo.workflows.dsl.templates import V1alpha1Parameter from argo.workflows.dsl.templates import V1alpha1Template from argo.workflows.dsl.templates import V1Container # Define your workflow class DagDiamond(Workflow): @task @parameter(name="message", value="A") def A(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="B") @dependencies(["A"]) def B(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="C") @dependencies(["A"]) def C(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @task @parameter(name="message", value="D") @dependencies(["B", "C"]) def D(self, message: V1alpha1Parameter) -> V1alpha1Template: return self.echo(message=message) @template @inputs.parameter(name="message") def echo(self, message: V1alpha1Parameter) -> V1Container: container = V1Container( image="alpine:3.7", name="echo", command=["echo", "{{inputs.parameters.message}}"], ) return container # Create a workflow wk = DagDiamond() # Load the API to communicate with the cluster load_kube_config() v1alpha1 = V1alpha1Api() # Current hack to make it work: should be fixed soon manifest = wk.to_dict() manifest["apiVersion"] = "argoproj.io/v1alpha1" manifest["kind"] = "Workflow" manifest["metadata"]["name"] += secrets.token_hex(5) manifest["spec"]["serviceAccountName"] = "argo" # Submit the workflow to the cluster sub_wk = v1alpha1.create_namespaced_workflow(namespace="argo", body=manifest) # List workflows (executed or not) wfs = v1alpha1.list_namespaced_workflows(namespace="argo") print(f"{len(wfs.items)} workflows on the cluster.")
thanks very much, i got sucess also
should probably be solved here https://github.com/argoproj-labs/argo-python-dsl
Also pypi argo-python-dsl now points to the community maintained release