flytekit icon indicating copy to clipboard operation
flytekit copied to clipboard

Override Serialization/Deserialization Behavior for `FlyteTypes` by `mashumaro`

Open Future-Outlier opened this issue 1 year ago • 1 comments

Why are the changes needed?

  1. remove from json and to json in the dataclass Transformer so that we can ended up remove the dataclass json dependency
  2. Use mashumaro SerializableType interface so that we can support plugins like pydantic more easily in the future

What changes were proposed in this pull request?

  1. Remove from_json and to_jsonin the dataclass Transformer
  2. Use mashumaro SerializableType interface to define our serialization for FlyteTypes

Flyte types: FlyteFile, FlyteDirectory and StructuredDataset mashumaro reference: https://github.com/Fatal1ty/mashumaro?tab=readme-ov-file#serializabletype-interface

How was this patch tested?

local execution, remote execution and unit tests.

Setup process

python example

import os
from dataclasses import dataclass
from pathlib import Path
from typing import Annotated, Optional

import flytekit
import pandas as pd
from dataclasses_json import dataclass_json
from flytekit import kwtypes, task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.structured import StructuredDataset

all_cols = kwtypes(Name=str, Age=int, Height=int)

@dataclass_json
@dataclass
class DC1SD:
    sd: Annotated[StructuredDataset, all_cols]

@task
def generate_pandas_df(a: int) -> pd.DataFrame:
    return pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [a, 22], "Height": [160, 178]})

@task
def get_subset_pandas_df(df: Annotated[StructuredDataset, all_cols]) -> Annotated[StructuredDataset, all_cols]:
    df = df.open(pd.DataFrame).all()
    df = pd.concat([df, pd.DataFrame([[30]], columns=["Age"])])
    return StructuredDataset(dataframe=df)

@task
def sd2dc(df: Annotated[StructuredDataset, all_cols]) -> DC1SD:
    return DC1SD(sd=df)

@workflow
def sd_wf() -> DC1SD:
    df1 = generate_pandas_df(1)
    df2 = get_subset_pandas_df(df=df1)
    return sd2dc(df=df2)

@dataclass_json
@dataclass
class DC2:
    flyteFile: FlyteFile
    flyteDir: FlyteDirectory
    optional_flyteFile: Optional[FlyteFile] = None
    optional_flyteDir: Optional[FlyteDirectory] = None

@dataclass_json
@dataclass
class DC1FILE:
    flyteFile1: FlyteFile

@dataclass_json
@dataclass
class DC2FILE:
    flyteFile1: FlyteFile
    flyteFile2: FlyteFile

@task
def create_local_txt_file() -> FlyteFile:
    file_path = "./example.txt"
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    with open(file_path, "w") as f:
        f.write("Default content")
    return FlyteFile(path=file_path)

@task
def get_remote_txt_file(f: FlyteFile) -> FlyteFile:
    with open(f, newline="") as nf:
        print(nf.read())
    return f

@task
def create_dir() -> FlyteDirectory:
    return FlyteDirectory(path=flytekit.current_context().working_directory)

@task
def create_remote_dir() -> FlyteDirectory:
    return FlyteDirectory(path="s3://my-s3-bucket/a/")

@task
def t1(f: FlyteFile, d: FlyteDirectory) -> DC2:
    return DC2(flyteFile=f, flyteDir=d)

@task
def t2(f1: FlyteFile, f2: FlyteFile) -> DC2FILE:
    return DC2FILE(flyteFile1=f1, flyteFile2=f2)

@task
def t3(f1: FlyteFile) -> DC1FILE:
    return DC1FILE(flyteFile1=f1)

@workflow
def wf() -> DC2:
    f = create_local_txt_file()
    d = create_dir()
    rd = create_remote_dir()
    a = t1(f=f, d=rd)
    b = t1(f=f, d=d)
    wf_DC2FILE()
    wf_DC1FILE()
    sd_wf()
    return b

@workflow
def wf_DC2FILE() -> DC2FILE:
    local_f = create_local_txt_file()
    remote_f = get_remote_txt_file(f=local_f)
    return t2(f1=local_f, f2=remote_f)

@workflow
def wf_DC1FILE(f: FlyteFile = FlyteFile(path="s3://my-s3-bucket/a/example.txt")) -> DC1FILE:
    f = get_remote_txt_file(f=f)
    return t3(f1=f)

Screenshots

remote execution

image #### local execution image

Check all the applicable boxes

  • [x] I updated the documentation accordingly.
  • [x] All new and existing tests passed.
  • [x] All commits are signed-off.

Related PRs

Docs link

Future-Outlier avatar Jul 03 '24 18:07 Future-Outlier

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Project coverage is 89.56%. Comparing base (5b3e725) to head (8355815). Report is 3 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2554       +/-   ##
===========================================
+ Coverage   41.71%   89.56%   +47.84%     
===========================================
  Files         183       95       -88     
  Lines       18728     4110    -14618     
  Branches     3915        0     -3915     
===========================================
- Hits         7813     3681     -4132     
+ Misses      10782      429    -10353     
+ Partials      133        0      -133     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar Jul 04 '24 05:07 codecov[bot]