flytekit icon indicating copy to clipboard operation
flytekit copied to clipboard

[Core feature] map_task to support ContainerTask

Open machichima opened this issue 7 months ago • 5 comments

Tracking issue

flyteorg/flyte#6277

Why are the changes needed?

Currently, we cannot use ContainerTask in map task. This PR is trying to make this integration works.

What changes were proposed in this pull request?

  • Make ContainerTask work with map task in local and remote
  • Add unit test for running ContainerTask with map task locally

How was this patch tested?

unit test & example code below:

Execute following code locally and remotly should pass:

  • local: pyflyte run containertask_maptask.py wf
  • remote: pyflyte run --remote containertask_maptask.py wf
# containertask_maptask.py
import functools

from flytekit import ContainerTask, kwtypes, map_task, workflow

calculate_ellipse_area_python_template_style = ContainerTask(
    name="calculate_ellipse_area_python_template_style",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float),
    image="ghcr.io/flyteorg/rawcontainers-python:v2",
    command=[
        "python",
        "calculate-ellipse-area.py",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "/var/outputs",
    ],
)


@workflow
def nomap_wf(a: float = 0.5, b: float = 0.4) -> float:
    res = calculate_ellipse_area_python_template_style(a, b)
    return res


@workflow
def wf(a: list[float] = [3.0, 4.0, 5.0], b: float = 4.0) -> list[float]:
    partial_task = functools.partial(calculate_ellipse_area_python_template_style, b=b)
    res = map_task(partial_task)(a=a)
    return res

Setup process

Screenshots

  • Result in remote

image

  • Result in local

image

Check all the applicable boxes

  • [ ] I updated the documentation accordingly.
  • [x] All new and existing tests passed.
  • [x] All commits are signed-off.

Related PRs

Docs link

Summary by Bito

This pull request enhances map task functionality by integrating ContainerTask support for both local and remote execution. It includes significant refactoring of the execute method to return native Python types, improving usability and maintainability. Comprehensive unit tests validate this integration and confirm expected behavior, ensuring expected functionality in both environments.

machichima avatar May 17 '25 15:05 machichima