flytekit icon indicating copy to clipboard operation
flytekit copied to clipboard

Raw Container Task Local Execution

Open Future-Outlier opened this issue 1 year ago • 8 comments

Tracking issue

https://github.com/flyteorg/flyte/issues/3876

Why are the changes needed?

We want to support users to run raw-container task locally.

Note: I didn't support ~~datetime.timedelta~~, List and Dict. Since I found that copilot has bug to deal with that, so I will support them until those bugs are fixed. (This implies that we have to fix flytecopilot for nested types and collections types first)

What changes were proposed in this pull request?

  1. override local_execute function in class ContainerTask(PythonTask)
  2. use docker python library to execute docker command
  3. support int, str, float, bool, datetime.datetime, FlyteDirectory and FlyteFile ~~4. I will strip white space because it will be too complicated to support string with white space.~~ ~~(datetime.datetime will be affected)~~
  4. I've supported FlyteDirectroy and FlyteFile as input, ~~however, in remote cases, flytecopilot will fail, so I think I can support it locally first, and create an issue to support remote cases.~~
  5. Support direct file paths and template-style references as inputs. For example, {{.inputs.infile}} and /var/inputs/infile both are valid inputs.

How was this patch tested?

  1. local execution with 4 examples. Reference: https://docs.flyte.org/en/latest/user_guide/customizing_dependencies/raw_containers.html#raw-container Note: julia image has error when using arm64, I've tested it by docker run -it into the container and found this error. image

  2. local execution with 5 prmitive types as input and output.

  3. FlyteFile task with input and output

  4. FlyteFile task with output only

  5. FlyteDirectory task with input and output

  6. FlyteDirectory task with output only

Note: I've provided an image on docker hub, you can switch to this branch and test it directly. image: futureoutlier/rawcontainer:0320

Setup process

git clone https://github.com/flyteorg/flytekit
gh pr checkout 2258
make setup
pip install -e .
python raw_container_local_execution.py

python example:

import logging
from typing import Tuple, List
import datetime
from flytekit import ContainerTask, kwtypes, workflow, task
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory


logger = logging.getLogger(__file__)

@workflow
def primitive_types(a: int, b: bool, c: float, d: str, e: datetime.datetime, f: datetime.timedelta) \
                            -> Tuple[int, bool, float, str, datetime.datetime, datetime.timedelta]:
    return python_return_same_values(a=a, b=b, c=c, d=d, e=e, f=f)

python_return_same_values = ContainerTask(
    name="python_return_same_values",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=int, b=bool, c=float, d=str, e=datetime.datetime, f=datetime.timedelta),
    outputs=kwtypes(a=int, b=bool, c=float, d=str, e=datetime.datetime, f=datetime.timedelta),
    image="futureoutlier/rawcontainer:0320",
    command=[
        "python",
        "return_same_value.py",
        "{{.inputs.a}}",
        "{{.inputs.b}}",
        "{{.inputs.c}}",
        "{{.inputs.d}}",
        "{{.inputs.e}}",
        "{{.inputs.f}}",
        "/var/outputs",
    ],
)
 

flyte_file_io = ContainerTask(
    name="flyte_file_io",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(inputs=FlyteFile),
    outputs=kwtypes(out=FlyteFile),
    image="futureoutlier/rawcontainer:0320",
    command=[
        "python",
        "write_flytefile.py",
        "{{.inputs.inputs}}",
        # "/var/inputs/inputs",
        "/var/outputs/out",
    ],
)

flyte_dir_io = ContainerTask(
    name="flyte_dir_io",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(inputs=FlyteDirectory),
    outputs=kwtypes(out=FlyteDirectory),
    image="futureoutlier/rawcontainer:0320",
    command=[
        "python",
        "write_flytedir.py",
        "{{.inputs.inputs}}",
        # "/var/inputs/inputs",
        "/var/outputs/out",
    ],
)

flyte_dir_out_only = ContainerTask(
    name="flyte_dir_out_only",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    # inputs=kwtypes(inputs=FlyteDirectory),
    outputs=kwtypes(out=FlyteDirectory),
    image="futureoutlier/rawcontainer:0320",
    command=[
        "python",
        "return_flytedir.py",
        "/var/outputs/out",
    ],
)

flyte_file_out_only = ContainerTask(
    name="flyte_file_out_only",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    # inputs=kwtypes(inputs=FlyteDirectory),
    outputs=kwtypes(out=FlyteFile),
    image="futureoutlier/rawcontainer:0320",
    command=[
        "python",
        "return_flytefile.py",
        "/var/outputs/out",
    ],
)

@task
def flyte_file_task() -> FlyteFile:
    with open("./a.txt", "w") as file:
        file.write("This is a.txt file.")
    return FlyteFile(path="./a.txt")

@workflow
def flyte_file_io_wf() -> FlyteFile:
    ff = flyte_file_task()
    return flyte_file_io(inputs=ff)

@task
def flyte_dir_task() -> FlyteDirectory:
    from pathlib import Path
    import flytekit
    import os

    working_dir = flytekit.current_context().working_directory
    local_dir = Path(os.path.join(working_dir, "csv_files"))
    local_dir.mkdir(exist_ok=True)
    write_file = local_dir / "a.txt"
    with open(write_file, "w") as file:
        file.write("This is for flyte dir.")

    return FlyteDirectory(path=str(local_dir))

@workflow
def flyte_dir_io_wf() -> FlyteDirectory:
    fd = flyte_dir_task()
    return flyte_dir_io(inputs=fd)

if __name__ == "__main__":
    print(flyte_dir_io_wf())
    print(flyte_file_io_wf())
    print(primitive_types(a=0, b=False, c=3.0, d="hello", e=datetime.datetime.now(), 
                          f=datetime.timedelta(days=1, hours=3, minutes=2, seconds=3, microseconds=5)))
    print(flyte_dir_out_only())
    print(flyte_file_out_only())

Dockerfile

# Use the Alpine Linux version of Python 3.9 as the base image
FROM python:3.9-alpine

# Set the working directory in the container
WORKDIR /root

# You can use the docker copy command after building the image to copy test.py into the /app directory of the container
COPY ./write_flytefile.py /root/write_flytefile.py
COPY ./write_flytedir.py /root/write_flytedir.py
COPY ./return_same_value.py /root/return_same_value.py
COPY ./return_flytefile.py /root/return_flytefile.py
COPY ./return_flytedir.py /root/return_flytedir.py


# Specify the command to run when the container starts. Here, we use /bin/sh to start a shell.
CMD ["/bin/sh"]

write_flytefile.py

import sys
from pathlib import Path

def copy_content_to_output(input_path: Path, output_path: Path):

    content = input_path.read_text()

    output_path.write_text(content)

if __name__ == "__main__":
    if len(sys.argv) > 2:
        input_path = Path(sys.argv[1])
        output_path = Path(sys.argv[2])
        copy_content_to_output(input_path, output_path)
    else:
        print("Usage: script.py <input_path> <output_path>")

write_flytedir.py

import sys
from pathlib import Path
import shutil

def copy_directory(input_path: Path, output_path: Path):
    if not input_path.exists() or not input_path.is_dir():
        print(f"Error: {input_path} does not exist or is not a directory.")
        return

    try:
        shutil.copytree(input_path, output_path)
        print(f"Directory {input_path} was successfully copied to {output_path}")
    except Exception as e:
        print(f"Error copying {input_path} to {output_path}: {e}")

if __name__ == "__main__":
    if len(sys.argv) > 2:
        input_path = Path(sys.argv[1])
        output_path = Path(sys.argv[2])
        copy_directory(input_path, output_path)
    else:
        print("Usage: script.py <input_directory_path> <output_directory_path>")

return_same_value.py

import sys

def write_output(output_dir, output_file, v):
    with open(f"{output_dir}/{output_file}", "w") as f:
            f.write(str(v))

def main(*args, output_dir):
    # Generate output files for each input argument
    for i, arg in enumerate(args, start=1):
        # Using i to generate filenames like 'a', 'b', 'c', ...
        output_file = chr(ord('a') + i - 1)
        write_output(output_dir, output_file, arg)

if __name__ == "__main__":
    *inputs, output_dir = sys.argv[1:]  # Unpack all inputs except for the last one for output_dir

    main(*inputs, output_dir=output_dir)

return_flytefile.py

import sys
from pathlib import Path


def write_output(out_path: Path):
    out_path.write_text("return flyte file content")


if __name__ == "__main__":
    write_output(Path(sys.argv[1]))

return_flytedir.py

import sys
from pathlib import Path

def write_output(directory_path: Path):
    directory_path.mkdir(parents=True, exist_ok=True)
    
    file_path = directory_path / "test_flyte_dir_file"
    
    file_path.write_text("prove flytedir works")

if __name__ == "__main__":
    if len(sys.argv) > 1:
        write_output(Path(sys.argv[1]))
    else:
        print("Usage: script.py <output_directory_path>")

Screenshots

Support documentation example image

All examples above image

Check all the applicable boxes

  • [ ] I updated the documentation accordingly.
  • [x] All new and existing tests passed.
  • [x] All commits are signed-off.

Related PRs

https://github.com/flyteorg/flytekit/pull/1745

Future-Outlier avatar Mar 12 '24 13:03 Future-Outlier

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Project coverage is 85.69%. Comparing base (bf38b8e) to head (a9b0e1e). Report is 6 commits behind head on master.

:exclamation: Current head a9b0e1e differs from pull request most recent head b068357. Consider uploading reports for the commit b068357 to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2258      +/-   ##
==========================================
+ Coverage   83.04%   85.69%   +2.65%     
==========================================
  Files         324       20     -304     
  Lines       24861     1279   -23582     
  Branches     3547        0    -3547     
==========================================
- Hits        20645     1096   -19549     
+ Misses       3591      183    -3408     
+ Partials      625        0     -625     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar Mar 12 '24 13:03 codecov[bot]

Can we check if docker is installed and then use it please do not add docker dependency

kumare3 avatar Mar 13 '24 05:03 kumare3

Can we check if docker is installed and then use it please do not add docker dependency

No problem, will do that.

Future-Outlier avatar Mar 13 '24 05:03 Future-Outlier

cc @eapolinario , @pingsutw Could you please help review? I think I might miss some edge cases. I will add a unit test after you think is good, thank you!

Future-Outlier avatar Mar 13 '24 10:03 Future-Outlier

cc @wild-endeavor If possible, please take a look too, thank you!

Future-Outlier avatar Mar 13 '24 10:03 Future-Outlier

Question:

  1. can I add it in dev-requirements.in? so that we can add unit tests for local execution.
  2. are we comfortable with def local_execute's implementation? I think it will be better to split it into small functions, however, most of implementation is referenced from here. https://github.com/flyteorg/flytekit/blob/329aea8545aa9ea9f903acd7e006f3f12efb9cbb/flytekit/core/base_task.py#L251-L328
  3. Are there any edge cases for container tasks? I assume every input should be have prefix "{{.inputs." and suffix "}}"

Future-Outlier avatar Mar 15 '24 04:03 Future-Outlier

First of all, thanks for you contribution! This will be a massive improvement to the local flytekit use cases.

This is shaping up to be a great PR. I left some comments along the way. But can you make sure to write tests where container tasks are involved in workflows where a combination of regular tasks and container tasks?

Really great advice! I'll do it today, thank you!

Future-Outlier avatar Mar 27 '24 23:03 Future-Outlier

@eapolinario

These are my updates:

  1. Command inputs now use an array, and this can make str don't need to be splitter. (sh -c is removed now.)
  2. Added support for datetime.timedelta.
  3. Implemented regex for input parsing.
  4. Updated all related tests.
  5. Improved string to boolean conversion: output_dict[k] = False if output_val.lower() == "false" else True.

Thank you very much!

Future-Outlier avatar Mar 28 '24 09:03 Future-Outlier

Hi, @eapolinario Can you help approve this PR? Companies like AXIS and Tesla might want this feature, thank you!

Future-Outlier avatar Apr 05 '24 08:04 Future-Outlier

This is looking pretty good, but we should increase the test coverage.

Can you add a separate test suite (call it test_local_raw_container.py under tests/flytekit/unit/core) and add a Dockerfile there and use it to build a local image to be used in tests. Add a few more tests to show that the manipulation of input/output data dir works. Also, make sure to add examples where FlyteFile are used. This will help the implementation to catch up with the current state of copilot.

I will do it today, thank you

Future-Outlier avatar Apr 09 '24 01:04 Future-Outlier

This is looking pretty good, but we should increase the test coverage.

Can you add a separate test suite (call it test_local_raw_container.py under tests/flytekit/unit/core) and add a Dockerfile there and use it to build a local image to be used in tests. Add a few more tests to show that the manipulation of input/output data dir works. Also, make sure to add examples where FlyteFile are used. This will help the implementation to catch up with the current state of copilot.

Hi, @eapolinario ,

I've implemented your suggestions:

  • Added 3 Python files for testing local execution of FlyteFile, FlyteDirectory, and primitive types within container tasks.
  • Created a Dockerfile to include the above test files.
  • Introduced 4 tests:
    1. test_flytefile_wf for FlyteFile I/O,
    2. test_flytedir_wf for FlyteDirectory I/O,
    3. test_primitive_types_wf for various primitive types,
    4. test_input_output_dir_manipulation to validate input/output directory handling.

Note: Despite FlyteDirectory not being supported by copilot, I've included it for local execution, ready to extend support based on user feedback.

If it looks good to you, can you help approve it? This is a huge improvement to companies using ContainerTask, thank you!

Future-Outlier avatar Apr 09 '24 04:04 Future-Outlier

So clean. Thank you!

Thank you both too😭😭😭

Future-Outlier avatar Apr 17 '24 23:04 Future-Outlier