flytekit icon indicating copy to clipboard operation
flytekit copied to clipboard

Add support raw container in the map task

Open pingsutw opened this issue 2 years ago • 6 comments

TL;DR

  1. Add support raw container in the map task
  2. Use default command of the raw container in the map task instead of replacing it with pyflyte-map-execute ....

Type

  • [x] Bug Fix
  • [ ] Feature
  • [ ] Plugin

Are all requirements met?

  • [x] Code completed
  • [x] Smoke tested
  • [ ] Unit tests added
  • [ ] Code documentation added
  • [ ] Any pending items have an associated Issue

Complete description

image

Here is an example

import typing
from typing import List

from flytekit import map_task, task, workflow, ContainerTask, kwtypes

calculate_ellipse_area_shell = ContainerTask(
    name="ellipse-area-metadata-python",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=int),
    outputs=kwtypes(area=float),
    image="pingsutw/raw-container:v9",
    command=[
        "python",
        "test.py",
        "{{.inputs.a}}",
        "/var/outputs",
    ],
)


@task
def coalesce(b: List[str]) -> str:
    coalesced = "".join(b)
    return coalesced


@task
def g_l(n: int) -> List[int]:
    res = []
    for i in range(n):
        res.append(i)
    return res


@workflow
def wf(n: int = 2):
    l = g_l(n=n)
    map_task(calculate_ellipse_area_shell)(a=l)


if __name__ == "__main__":
    result = wf()
  • dockerfile
FROM python:3.10-slim-buster

WORKDIR /root

COPY *.py /root/
  • test.py
import math
import sys
import os

def write_output(output_dir, output_file, v):
    with open(f"{output_dir}/{output_file}", "w") as f:
        f.write(str(v))


def calculate_area(a, b):
    return math.pi * a * b


def main(a, output_dir):
    # parse list
    li = a.strip('][').split(',')
    res = [eval(i) for i in li]

    # get the job index
    index = int(os.environ.get(os.environ.get("BATCH_JOB_ARRAY_INDEX_VAR_NAME")))

    area = calculate_area(res[index], 3)

    write_output(output_dir, "area", f"{area}")
    write_output(output_dir, "_SUCCESS", "")


if __name__ == "__main__":
    a = sys.argv[1]
    output_dir = sys.argv[2]

    main(a, output_dir)

Tracking Issue

https://flyte-org.slack.com/archives/CP2HDHKE1/p1678230956906899

Follow-up issue

https://github.com/flyteorg/flytecopilot/pull/54 https://github.com/flyteorg/flyteplugins/pull/329

pingsutw avatar Mar 09 '23 23:03 pingsutw

Codecov Report

Merging #1547 (fcfad21) into master (e3cee83) will increase coverage by 0.01%. The diff coverage is 92.00%.

@@            Coverage Diff             @@
##           master    #1547      +/-   ##
==========================================
+ Coverage   69.93%   69.95%   +0.01%     
==========================================
  Files         319      319              
  Lines       29569    29587      +18     
  Branches     5332     5336       +4     
==========================================
+ Hits        20680    20697      +17     
- Misses       8370     8371       +1     
  Partials      519      519              
Impacted Files Coverage Δ
flytekit/core/container_task.py 38.57% <0.00%> (ø)
flytekit/core/map_task.py 52.53% <92.30%> (+2.19%) :arrow_up:
flytekit/tools/translator.py 74.39% <100.00%> (ø)
tests/flytekit/unit/core/test_map_task.py 95.32% <100.00%> (+0.25%) :arrow_up:

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

codecov[bot] avatar Mar 10 '23 00:03 codecov[bot]

before merging, let's copy the minimum copilot and propeller versions into the PR description.

wild-endeavor avatar Mar 20 '23 18:03 wild-endeavor

@pingsutw shall we merge this?

kumare3 avatar Jan 02 '24 05:01 kumare3