flytekit
flytekit copied to clipboard
[Core feature] map_task to support ContainerTask
Tracking issue
flyteorg/flyte#6277
Why are the changes needed?
Currently, we cannot use ContainerTask in map task. This PR is trying to make this integration works.
What changes were proposed in this pull request?
- Make
ContainerTaskwork with map task in local and remote - Add unit test for running
ContainerTaskwith map task locally
How was this patch tested?
unit test & example code below:
Execute following code locally and remotly should pass:
- local:
pyflyte run containertask_maptask.py wf - remote:
pyflyte run --remote containertask_maptask.py wf
# containertask_maptask.py
import functools
from flytekit import ContainerTask, kwtypes, map_task, workflow
calculate_ellipse_area_python_template_style = ContainerTask(
name="calculate_ellipse_area_python_template_style",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=float, b=float),
outputs=kwtypes(area=float),
image="ghcr.io/flyteorg/rawcontainers-python:v2",
command=[
"python",
"calculate-ellipse-area.py",
"{{.inputs.a}}",
"{{.inputs.b}}",
"/var/outputs",
],
)
@workflow
def nomap_wf(a: float = 0.5, b: float = 0.4) -> float:
res = calculate_ellipse_area_python_template_style(a, b)
return res
@workflow
def wf(a: list[float] = [3.0, 4.0, 5.0], b: float = 4.0) -> list[float]:
partial_task = functools.partial(calculate_ellipse_area_python_template_style, b=b)
res = map_task(partial_task)(a=a)
return res
Setup process
Screenshots
- Result in remote
- Result in local
Check all the applicable boxes
- [ ] I updated the documentation accordingly.
- [x] All new and existing tests passed.
- [x] All commits are signed-off.