argo-python-dsl
argo-python-dsl copied to clipboard
Python DSL for Argo Workflows | Mirrored to https://github.com/argoproj-labs/argo-python-dsl
argo-python-dsl 
Python DSL for Argo Workflows
If you're new to Argo, we recommend checking out the examples in pure YAML. The language is descriptive and the Argo examples provide an exhaustive explanation.
For a more experienced audience, this DSL grants you the ability to programatically define Argo Workflows in Python which is then translated to the Argo YAML specification.
The DSL makes use of the Argo models defined in the Argo Python client repository. Combining the two approaches we are given the whole low-level control over Argo Workflows.
Getting started
Hello World
This example demonstrates the simplest functionality. Defining a Workflow by subclassing the Workflow class and a single template with the @template decorator.
The entrypoint to the workflow is defined as an entrypoint class property.
| Argo YAML | Argo Python |
|---|---|
|
|
DAG: Tasks
This example demonstrates tasks defined via dependencies forming a diamond structure. Tasks are defined using the @task decorator and they must return a valid template.
The entrypoint is automatically created as main for the top-level tasks of the Workflow.
| Argo YAML | Argo Python |
|---|---|
|
|
Artifacts
Artifacts can be passed similarly to parameters in three forms: arguments, inputs and outputs, where arguments is the default one (simply @artifact or @parameter).
I.e.: inputs.artifact(...)
Both artifacts and parameters are passed one by one, which means that for multiple artifacts (parameters), one should call:
@inputs.artifact(name="artifact", ...)
@inputs.parameter(name="parameter_a", ...)
@inputs.parameter(...)
def foo(self, artifact: V1alpha1Artifact, prameter_b: V1alpha1Parameter, ...): pass
A complete example:
| Argo YAML | Argo Python |
|---|---|
|
|
Going further: closure and scope
This is where it gets quite interesting. So far, we've only scratched the benefits that the Python implementation provides.
What if we want to use native Python code and execute it as a step in the Workflow. What are our options?
Option A) is to reuse the existing mindset, dump the code in a string, pass it as the source to the V1ScriptTemplate model and wrap it with the template decorator.
This is illustrated in the following code block:
import textwrap
class ScriptsPython(Workflow):
...
@template
def gen_random_int(self) -> V1alpha1ScriptTemplate:
source = textwrap.dedent("""\
import random
i = random.randint(1, 100)
print(i)
""")
template = V1alpha1ScriptTemplate(
image="python:alpine3.6",
name="gen-random-int",
command=["python"],
source=source
)
return template
Which results in:
api_version: argoproj.io/v1alpha1
kind: Workflow
metadata:
generate_name: scripts-python-
name: scripts-python
spec:
entrypoint: main
...
templates:
- name: gen-random-int
script:
command:
- python
image: python:alpine3.6
name: gen-random-int
source: 'import random\ni = random.randint(1, 100)\nprint(i)\n'
Not bad, but also not living up to the full potential. Since we're already writing Python, why would we wrap the code in a string? This is where we introduce closures.
closures
The logic of closures is quite simple. Just wrap the function you want to execute in a container in the @closure decorator. The closure then takes care of the rest and returns a template (just as the @template decorator).
The only thing we need to take care of is to provide it an image which has the necessary Python dependencies installed and is present in the cluster.
There is a plan to eliminate even this step in the future, but currently it is inavoidable.
Following the previous example:
class ScriptsPython(Workflow):
...
@closure(
image="python:alpine3.6"
)
def gen_random_int() -> V1alpha1ScriptTemplate:
import random
i = random.randint(1, 100)
print(i)
The closure implements the V1alpha1ScriptTemplate, which means that you can pass in things like resources, env, etc...
Also, make sure that you import whatever library you are using, the context is not preserved --- closure behaves as a staticmethod and is sandboxed from the module scope.
scopes
Now, what if we had a function (or a whole script) which is quite big. Wrapping it in a single Python function is not very Pythonic and it gets tedious. This is where we can make use of scopes.
Say that we, for example, wanted to initialize logging before running our gen_random_int function.
...
@closure(
scope="main",
image="python:alpine3.6"
)
def gen_random_int(main) -> V1alpha1ScriptTemplate:
import random
main.init_logging()
i = random.randint(1, 100)
print(i)
@scope(name="main")
def init_logging(level="DEBUG"):
import logging
logging_level = getattr(logging, level, "INFO")
logging.getLogger("__main__").setLevel(logging_level)
Notice the 3 changes that we've made:
@closure(
scope="main", # <--- provide the closure a scope
image="python:alpine3.6"
)
def gen_random_int(main): # <--- use the scope name
@scope(name="main") # <--- add function to a scope
def init_logging(level="DEBUG"):
Each function in the given scope is then namespaced by the scope name and injected to the closure.
I.e. the resulting YAML looks like this:
...
spec:
...
templates:
- name: gen-random-int
script:
command:
- python
image: python:alpine3.6
name: gen-random-int
source: |-
import logging
import random
class main:
"""Scoped objects injected from scope 'main'."""
@staticmethod
def init_logging(level="DEBUG"):
logging_level = getattr(logging, level, "INFO")
logging.getLogger("__main__").setLevel(logging_level)
main.init_logging()
i = random.randint(1, 100)
print(i)
The compilation also takes all imports to the front and remove duplicates for convenience and more natural look so that you don't feel like poking your eyes when you look at the resulting YAML.
For more examples see the examples folder.
Authors:
- [ Maintainer ] Marek Cermak [email protected], [email protected]