flytekit
flytekit copied to clipboard
Override Serialization/Deserialization Behavior for `FlyteTypes` by `mashumaro`
Why are the changes needed?
- remove from json and to json in the dataclass Transformer so that we can ended up remove the dataclass json dependency
- Use mashumaro SerializableType interface so that we can support plugins like pydantic more easily in the future
What changes were proposed in this pull request?
- Remove
from_jsonandto_jsonin the dataclass Transformer - Use mashumaro SerializableType interface to define our serialization for
FlyteTypes
Flyte types: FlyteFile, FlyteDirectory and StructuredDataset mashumaro reference: https://github.com/Fatal1ty/mashumaro?tab=readme-ov-file#serializabletype-interface
How was this patch tested?
local execution, remote execution and unit tests.
Setup process
python example
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated, Optional
import flytekit
import pandas as pd
from dataclasses_json import dataclass_json
from flytekit import kwtypes, task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.structured import StructuredDataset
all_cols = kwtypes(Name=str, Age=int, Height=int)
@dataclass_json
@dataclass
class DC1SD:
sd: Annotated[StructuredDataset, all_cols]
@task
def generate_pandas_df(a: int) -> pd.DataFrame:
return pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [a, 22], "Height": [160, 178]})
@task
def get_subset_pandas_df(df: Annotated[StructuredDataset, all_cols]) -> Annotated[StructuredDataset, all_cols]:
df = df.open(pd.DataFrame).all()
df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
return StructuredDataset(dataframe=df)
@task
def sd2dc(df: Annotated[StructuredDataset, all_cols]) -> DC1SD:
return DC1SD(sd=df)
@workflow
def sd_wf() -> DC1SD:
df1 = generate_pandas_df(1)
df2 = get_subset_pandas_df(df=df1)
return sd2dc(df=df2)
@dataclass_json
@dataclass
class DC2:
flyteFile: FlyteFile
flyteDir: FlyteDirectory
optional_flyteFile: Optional[FlyteFile] = None
optional_flyteDir: Optional[FlyteDirectory] = None
@dataclass_json
@dataclass
class DC1FILE:
flyteFile1: FlyteFile
@dataclass_json
@dataclass
class DC2FILE:
flyteFile1: FlyteFile
flyteFile2: FlyteFile
@task
def create_local_txt_file() -> FlyteFile:
file_path = "./example.txt"
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as f:
f.write("Default content")
return FlyteFile(path=file_path)
@task
def get_remote_txt_file(f: FlyteFile) -> FlyteFile:
with open(f, newline="") as nf:
print(nf.read())
return f
@task
def create_dir() -> FlyteDirectory:
return FlyteDirectory(path=flytekit.current_context().working_directory)
@task
def create_remote_dir() -> FlyteDirectory:
return FlyteDirectory(path="s3://my-s3-bucket/a/")
@task
def t1(f: FlyteFile, d: FlyteDirectory) -> DC2:
return DC2(flyteFile=f, flyteDir=d)
@task
def t2(f1: FlyteFile, f2: FlyteFile) -> DC2FILE:
return DC2FILE(flyteFile1=f1, flyteFile2=f2)
@task
def t3(f1: FlyteFile) -> DC1FILE:
return DC1FILE(flyteFile1=f1)
@workflow
def wf() -> DC2:
f = create_local_txt_file()
d = create_dir()
rd = create_remote_dir()
a = t1(f=f, d=rd)
b = t1(f=f, d=d)
wf_DC2FILE()
wf_DC1FILE()
sd_wf()
return b
@workflow
def wf_DC2FILE() -> DC2FILE:
local_f = create_local_txt_file()
remote_f = get_remote_txt_file(f=local_f)
return t2(f1=local_f, f2=remote_f)
@workflow
def wf_DC1FILE(f: FlyteFile = FlyteFile(path="s3://my-s3-bucket/a/example.txt")) -> DC1FILE:
f = get_remote_txt_file(f=f)
return t3(f1=f)
Screenshots
remote execution
Check all the applicable boxes
- [x] I updated the documentation accordingly.
- [x] All new and existing tests passed.
- [x] All commits are signed-off.
Related PRs
Docs link
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 89.56%. Comparing base (
5b3e725) to head (8355815). Report is 3 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #2554 +/- ##
===========================================
+ Coverage 41.71% 89.56% +47.84%
===========================================
Files 183 95 -88
Lines 18728 4110 -14618
Branches 3915 0 -3915
===========================================
- Hits 7813 3681 -4132
+ Misses 10782 429 -10353
+ Partials 133 0 -133
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.