Raw Container Task Local Execution
Tracking issue
https://github.com/flyteorg/flyte/issues/3876
Why are the changes needed?
We want to support users to run raw-container task locally.
Note: I didn't support ~~datetime.timedelta~~, List and Dict.
Since I found that copilot has bug to deal with that, so I will support them until those bugs are fixed.
(This implies that we have to fix flytecopilot for nested types and collections types first)
What changes were proposed in this pull request?
- override
local_executefunction inclass ContainerTask(PythonTask) - use
dockerpython library to execute docker command - support int, str, float, bool, datetime.datetime, FlyteDirectory and FlyteFile ~~4. I will strip white space because it will be too complicated to support string with white space.~~ ~~(datetime.datetime will be affected)~~
- I've supported FlyteDirectroy and FlyteFile as input, ~~however, in remote cases, flytecopilot will fail, so I think I can support it locally first, and create an issue to support remote cases.~~
- Support
direct file pathsandtemplate-style referencesas inputs. For example,{{.inputs.infile}}and/var/inputs/infileboth are valid inputs.
How was this patch tested?
-
local execution with 4 examples. Reference: https://docs.flyte.org/en/latest/user_guide/customizing_dependencies/raw_containers.html#raw-container Note:
juliaimage has error when using arm64, I've tested it bydocker run -itinto the container and found this error. -
local execution with 5 prmitive types as input and output.
-
FlyteFiletask with input and output -
FlyteFiletask with output only -
FlyteDirectorytask with input and output -
FlyteDirectorytask with output only
Note: I've provided an image on docker hub, you can switch to this branch and test it directly.
image: futureoutlier/rawcontainer:0320
Setup process
git clone https://github.com/flyteorg/flytekit
gh pr checkout 2258
make setup
pip install -e .
python raw_container_local_execution.py
python example:
import logging
from typing import Tuple, List
import datetime
from flytekit import ContainerTask, kwtypes, workflow, task
from flytekit.types.file import FlyteFile
from flytekit.types.directory import FlyteDirectory
logger = logging.getLogger(__file__)
@workflow
def primitive_types(a: int, b: bool, c: float, d: str, e: datetime.datetime, f: datetime.timedelta) \
-> Tuple[int, bool, float, str, datetime.datetime, datetime.timedelta]:
return python_return_same_values(a=a, b=b, c=c, d=d, e=e, f=f)
python_return_same_values = ContainerTask(
name="python_return_same_values",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(a=int, b=bool, c=float, d=str, e=datetime.datetime, f=datetime.timedelta),
outputs=kwtypes(a=int, b=bool, c=float, d=str, e=datetime.datetime, f=datetime.timedelta),
image="futureoutlier/rawcontainer:0320",
command=[
"python",
"return_same_value.py",
"{{.inputs.a}}",
"{{.inputs.b}}",
"{{.inputs.c}}",
"{{.inputs.d}}",
"{{.inputs.e}}",
"{{.inputs.f}}",
"/var/outputs",
],
)
flyte_file_io = ContainerTask(
name="flyte_file_io",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(inputs=FlyteFile),
outputs=kwtypes(out=FlyteFile),
image="futureoutlier/rawcontainer:0320",
command=[
"python",
"write_flytefile.py",
"{{.inputs.inputs}}",
# "/var/inputs/inputs",
"/var/outputs/out",
],
)
flyte_dir_io = ContainerTask(
name="flyte_dir_io",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(inputs=FlyteDirectory),
outputs=kwtypes(out=FlyteDirectory),
image="futureoutlier/rawcontainer:0320",
command=[
"python",
"write_flytedir.py",
"{{.inputs.inputs}}",
# "/var/inputs/inputs",
"/var/outputs/out",
],
)
flyte_dir_out_only = ContainerTask(
name="flyte_dir_out_only",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
# inputs=kwtypes(inputs=FlyteDirectory),
outputs=kwtypes(out=FlyteDirectory),
image="futureoutlier/rawcontainer:0320",
command=[
"python",
"return_flytedir.py",
"/var/outputs/out",
],
)
flyte_file_out_only = ContainerTask(
name="flyte_file_out_only",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
# inputs=kwtypes(inputs=FlyteDirectory),
outputs=kwtypes(out=FlyteFile),
image="futureoutlier/rawcontainer:0320",
command=[
"python",
"return_flytefile.py",
"/var/outputs/out",
],
)
@task
def flyte_file_task() -> FlyteFile:
with open("./a.txt", "w") as file:
file.write("This is a.txt file.")
return FlyteFile(path="./a.txt")
@workflow
def flyte_file_io_wf() -> FlyteFile:
ff = flyte_file_task()
return flyte_file_io(inputs=ff)
@task
def flyte_dir_task() -> FlyteDirectory:
from pathlib import Path
import flytekit
import os
working_dir = flytekit.current_context().working_directory
local_dir = Path(os.path.join(working_dir, "csv_files"))
local_dir.mkdir(exist_ok=True)
write_file = local_dir / "a.txt"
with open(write_file, "w") as file:
file.write("This is for flyte dir.")
return FlyteDirectory(path=str(local_dir))
@workflow
def flyte_dir_io_wf() -> FlyteDirectory:
fd = flyte_dir_task()
return flyte_dir_io(inputs=fd)
if __name__ == "__main__":
print(flyte_dir_io_wf())
print(flyte_file_io_wf())
print(primitive_types(a=0, b=False, c=3.0, d="hello", e=datetime.datetime.now(),
f=datetime.timedelta(days=1, hours=3, minutes=2, seconds=3, microseconds=5)))
print(flyte_dir_out_only())
print(flyte_file_out_only())
Dockerfile
# Use the Alpine Linux version of Python 3.9 as the base image
FROM python:3.9-alpine
# Set the working directory in the container
WORKDIR /root
# You can use the docker copy command after building the image to copy test.py into the /app directory of the container
COPY ./write_flytefile.py /root/write_flytefile.py
COPY ./write_flytedir.py /root/write_flytedir.py
COPY ./return_same_value.py /root/return_same_value.py
COPY ./return_flytefile.py /root/return_flytefile.py
COPY ./return_flytedir.py /root/return_flytedir.py
# Specify the command to run when the container starts. Here, we use /bin/sh to start a shell.
CMD ["/bin/sh"]
write_flytefile.py
import sys
from pathlib import Path
def copy_content_to_output(input_path: Path, output_path: Path):
content = input_path.read_text()
output_path.write_text(content)
if __name__ == "__main__":
if len(sys.argv) > 2:
input_path = Path(sys.argv[1])
output_path = Path(sys.argv[2])
copy_content_to_output(input_path, output_path)
else:
print("Usage: script.py <input_path> <output_path>")
write_flytedir.py
import sys
from pathlib import Path
import shutil
def copy_directory(input_path: Path, output_path: Path):
if not input_path.exists() or not input_path.is_dir():
print(f"Error: {input_path} does not exist or is not a directory.")
return
try:
shutil.copytree(input_path, output_path)
print(f"Directory {input_path} was successfully copied to {output_path}")
except Exception as e:
print(f"Error copying {input_path} to {output_path}: {e}")
if __name__ == "__main__":
if len(sys.argv) > 2:
input_path = Path(sys.argv[1])
output_path = Path(sys.argv[2])
copy_directory(input_path, output_path)
else:
print("Usage: script.py <input_directory_path> <output_directory_path>")
return_same_value.py
import sys
def write_output(output_dir, output_file, v):
with open(f"{output_dir}/{output_file}", "w") as f:
f.write(str(v))
def main(*args, output_dir):
# Generate output files for each input argument
for i, arg in enumerate(args, start=1):
# Using i to generate filenames like 'a', 'b', 'c', ...
output_file = chr(ord('a') + i - 1)
write_output(output_dir, output_file, arg)
if __name__ == "__main__":
*inputs, output_dir = sys.argv[1:] # Unpack all inputs except for the last one for output_dir
main(*inputs, output_dir=output_dir)
return_flytefile.py
import sys
from pathlib import Path
def write_output(out_path: Path):
out_path.write_text("return flyte file content")
if __name__ == "__main__":
write_output(Path(sys.argv[1]))
return_flytedir.py
import sys
from pathlib import Path
def write_output(directory_path: Path):
directory_path.mkdir(parents=True, exist_ok=True)
file_path = directory_path / "test_flyte_dir_file"
file_path.write_text("prove flytedir works")
if __name__ == "__main__":
if len(sys.argv) > 1:
write_output(Path(sys.argv[1]))
else:
print("Usage: script.py <output_directory_path>")
Screenshots
Support documentation example
All examples above
Check all the applicable boxes
- [ ] I updated the documentation accordingly.
- [x] All new and existing tests passed.
- [x] All commits are signed-off.
Related PRs
https://github.com/flyteorg/flytekit/pull/1745
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 85.69%. Comparing base (
bf38b8e) to head (a9b0e1e). Report is 6 commits behind head on master.
:exclamation: Current head a9b0e1e differs from pull request most recent head b068357. Consider uploading reports for the commit b068357 to get more accurate results
Additional details and impacted files
@@ Coverage Diff @@
## master #2258 +/- ##
==========================================
+ Coverage 83.04% 85.69% +2.65%
==========================================
Files 324 20 -304
Lines 24861 1279 -23582
Branches 3547 0 -3547
==========================================
- Hits 20645 1096 -19549
+ Misses 3591 183 -3408
+ Partials 625 0 -625
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
Can we check if docker is installed and then use it please do not add docker dependency
Can we check if docker is installed and then use it please do not add docker dependency
No problem, will do that.
cc @eapolinario , @pingsutw Could you please help review? I think I might miss some edge cases. I will add a unit test after you think is good, thank you!
cc @wild-endeavor If possible, please take a look too, thank you!
Question:
- can I add it in
dev-requirements.in? so that we can add unit tests for local execution. - are we comfortable with
def local_execute's implementation? I think it will be better to split it into small functions, however, most of implementation is referenced from here. https://github.com/flyteorg/flytekit/blob/329aea8545aa9ea9f903acd7e006f3f12efb9cbb/flytekit/core/base_task.py#L251-L328 - Are there any edge cases for container tasks? I assume every input should be have prefix "{{.inputs." and suffix "}}"
First of all, thanks for you contribution! This will be a massive improvement to the local flytekit use cases.
This is shaping up to be a great PR. I left some comments along the way. But can you make sure to write tests where container tasks are involved in workflows where a combination of regular tasks and container tasks?
Really great advice! I'll do it today, thank you!
@eapolinario
These are my updates:
- Command inputs now use an array, and this can make
strdon't need to be splitter. (sh -cis removed now.) - Added support for
datetime.timedelta. - Implemented regex for input parsing.
- Updated all related tests.
- Improved string to boolean conversion:
output_dict[k] = False if output_val.lower() == "false" else True.
Thank you very much!
Hi, @eapolinario Can you help approve this PR? Companies like AXIS and Tesla might want this feature, thank you!
This is looking pretty good, but we should increase the test coverage.
Can you add a separate test suite (call it
test_local_raw_container.pyundertests/flytekit/unit/core) and add a Dockerfile there and use it to build a local image to be used in tests. Add a few more tests to show that the manipulation of input/output data dir works. Also, make sure to add examples whereFlyteFileare used. This will help the implementation to catch up with the current state of copilot.
I will do it today, thank you
This is looking pretty good, but we should increase the test coverage.
Can you add a separate test suite (call it
test_local_raw_container.pyundertests/flytekit/unit/core) and add a Dockerfile there and use it to build a local image to be used in tests. Add a few more tests to show that the manipulation of input/output data dir works. Also, make sure to add examples whereFlyteFileare used. This will help the implementation to catch up with the current state of copilot.
Hi, @eapolinario ,
I've implemented your suggestions:
- Added 3 Python files for testing local execution of
FlyteFile,FlyteDirectory, and primitive types within container tasks. - Created a Dockerfile to include the above test files.
- Introduced 4 tests:
test_flytefile_wfforFlyteFileI/O,test_flytedir_wfforFlyteDirectoryI/O,test_primitive_types_wffor various primitive types,test_input_output_dir_manipulationto validate input/output directory handling.
Note: Despite FlyteDirectory not being supported by copilot, I've included it for local execution, ready to extend support based on user feedback.
If it looks good to you, can you help approve it?
This is a huge improvement to companies using ContainerTask, thank you!
So clean. Thank you!
Thank you both too😭😭😭