flytekit icon indicating copy to clipboard operation
flytekit copied to clipboard

Annotated StructuredDataset: support `nested_types`

Open austin362667 opened this issue 1 year ago • 15 comments

Tracking issue

https://github.com/flyteorg/flyte/issues/4241

Why are the changes needed?

Currently StructuredDatasets only support flat schemas. This PR aims to support nested types as form of dict/json, dataclass, named args/kwargs.

What changes were proposed in this pull request?

  1. a flatten_dict() tool function in structured_dataset.py

    • Must use kwtypes() to pass in types. Check comments.
  2. After we get a list of SUPPORTED_TYPES, we select them by series of key joined by ..

    • how to flat? e.g.,
    {
      "a": {
      "b": {
            "c": {
                "d": "vvv"
            }
          }
        },
        "e.f": "www",
    }
    

    to {'a.b.c.d': 'vvv', 'e.f': 'www'}

    • Can not select by the sub keys. like "c.d".

How was this patch tested?

please take a look at screenshots and examples below.

Setup process

from typing import Annotated
from dataclasses import dataclass
import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow, ImageSpec

# flytekit_dev_version = "https://github.com/austin362667/flytekit.git@f5cd70dd053e6f3d4aaf5b90d9c4b28f32c0980a"
# image = ImageSpec(
#     packages=[
#         "pandas",
#         "google-cloud-bigquery",
#         "google-cloud-bigquery-storage",
#         f"git+{flytekit_dev_version}",
#         f"git+{flytekit_dev_version}#subdirectory=plugins/flytekit-bigquery",
#     ],
#     apt_packages=["git"],
#     source_root="./keys",
#     env={"GOOGLE_APPLICATION_CREDENTIALS": "./gcp-service-account.json"},
#     platform="linux/arm64",
#     registry="localhost:30000",
# )


## Case 1.
data = [
    {
        'company': 'XYZ pvt ltd',
        'location': 'London',
        'info': {
            'president': 'Rakesh Kapoor',
            'contacts': {
                'email': '[email protected]',
                'tel': '9876543210'
            }
        }
    },
    {
        'company': 'ABC pvt ltd',
        'location': 'USA',
        'info': {
            'president': 'Kapoor Rakesh',
            'contacts': {
                'email': '[email protected]',
                'tel': '0123456789'
            }
        }
    }
]

@dataclass
class ContactsField():
    email: str
    tel: str

@dataclass
class InfoField():
    president: str
    contacts: ContactsField

@dataclass
class CompanyField():
    location: str
    info: InfoField
    company: str


MyArgDataset = Annotated[StructuredDataset, kwtypes(company = str)]
MyDictDataset = Annotated[StructuredDataset, kwtypes(info = {"contacts": {"tel": str}})]
MyDictListDataset = Annotated[StructuredDataset, kwtypes(info = {"contacts": {"tel": str, "email": str}})]
MyTopDataClassDataset = Annotated[StructuredDataset, kwtypes( CompanyField )]
MySecondDataClassDataset = Annotated[StructuredDataset, kwtypes(info = InfoField)]
MyNestedDataClassDataset = Annotated[StructuredDataset, kwtypes(info = kwtypes(contacts = ContactsField))]

@task()
def create_bq_table() -> StructuredDataset:
    df = pd.json_normalize(data, max_level=0)
    print("original dataframe: \n", df)


    # Enable one of GCP uri below if you want!
    return StructuredDataset(
        dataframe=df,
        # uri= "gs://flyte_austin362667_bucket/nested_types"
        # uri= "bq://flyte-austin362667-gcp:dataset.nested_type"
    )

@task()
def print_table_by_arg(sd: MyArgDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyArgDataset dataframe: \n", t)
    return t

@task()
def print_table_by_dict(sd: MyDictDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyDictDataset dataframe: \n", t)
    return t

@task()
def print_table_by_list_dict(sd: MyDictListDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyDictListDataset dataframe: \n", t)
    return t

@task()
def print_table_by_top_dataclass(sd: MyTopDataClassDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyTopDataClassDataset dataframe: \n", t)
    return t

@task()
def print_table_by_second_dataclass(sd: MySecondDataClassDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MySecondDataClassDataset dataframe: \n", t)
    return t

@task()
def print_table_by_nested_dataclass(sd: MyNestedDataClassDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyNestedDataClassDataset dataframe: \n", t)
    return t

@workflow
def contacts_wf():
    sd = create_bq_table()
    print_table_by_arg(sd=sd)
    print_table_by_dict(sd=sd)
    print_table_by_list_dict(sd=sd)
    print_table_by_top_dataclass(sd=sd)
    print_table_by_second_dataclass(sd=sd)
    print_table_by_nested_dataclass(sd=sd)
    return 



## Case 2.
@dataclass
class Levels():
    # level1: str
    level2: str

Schema = Annotated[StructuredDataset, kwtypes(age=int, levels=Levels)]

@task()
def mytask_w() -> StructuredDataset:
    df = pd.DataFrame({
        "age": [1, 2],
        "levels": [
            {"level1": "1", "level2": "2"},
            {"level1": "2", "level2": "4"}
        ]
    })
    return StructuredDataset(dataframe=df)

# Should only show level2 string..
@task()
def mytask_r(sd: Schema) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("dataframe: \n", t)
    return t


@workflow
def levels_wf():
    sd = mytask_w()
    mytask_r(sd=sd)

Screenshots

Screenshot 2024-03-27 at 4 31 18 PM Screenshot 2024-03-15 at 6 30 58 PM

Check all the applicable boxes

  • [ ] I updated the documentation accordingly.
  • [x] All new and existing tests passed.
  • [x] All commits are signed-off.

Related PRs

Docs link

austin362667 avatar Mar 10 '24 10:03 austin362667

from dataclasses import dataclass
from flytekit import task, workflow, kwtypes
from flytekit.types.structured import StructuredDataset
from typing_extensions import Annotated
from flytekit import ImageSpec, task

import pandas as pd

# custom_image = ImageSpec(
#     name="basic:latest",
#     registry="localhost:30000",
#     packages=["pandas", "numpy"], 
#     apt_packages=['git'],
#     python_version=3.11,
# )

# if custom_image.is_container():
    # import pandas as pd

@dataclass
class DetailField():
    age: int
    sex: str

@dataclass
class RecordField():
    name: str
    detail: DetailField
    # city: str

Schema = Annotated[StructuredDataset, kwtypes({"Name": str}, {"Detail": {"Age": int, "Sex": str}})]
# Schema = Annotated[StructuredDataset, kwtypes({"Detail": DetailField}, Name=str)]
# Schema = Annotated[StructuredDataset, kwtypes(Name=str, Detail={"Age": int, "Sex": str})]
# Schema = Annotated[StructuredDataset, kwtypes({"Detail": {"Age": int, "Sex": str}})]
# Schema = Annotated[StructuredDataset, kwtypes(Detail=DetailField)]
# Schema = Annotated[StructuredDataset, kwtypes(Name=str)]
# Schema = Annotated[StructuredDataset, kwtypes({"Detail": DetailField},{"City": str},{"City": str}, Name=str)]


@task
def write_df() -> Schema:
    df = pd.DataFrame({"Name": ["Tom", "Joseph", "Alyssa"], "Detail": [{"Age":20, "Sex": "M"}, {"Age":22, "Sex": "M"}, {"Age":24, "Sex": "F"}], "City": ["Madrid","Paris","London"]})
    return StructuredDataset(dataframe=df)

@task
def read_df(structds: Schema):
    df = structds.open(pd.DataFrame).all()
    print(df)

@workflow
def struct_record_wf():
    out = write_df()
    read_df(structds=out)

if __name__ == "__main__":
   struct_record_wf()

austin362667 avatar Mar 10 '24 10:03 austin362667

FROM python:3.9-slim-buster
USER root
WORKDIR /root
ENV PYTHONPATH /root
RUN apt-get update && apt-get install build-essential -y
RUN apt-get install git -y
RUN pip install -U git+https://github.com/austin362667/flytekit.git@eb4e2c2d25ed57a138a8640fc7f059eebf393241
RUN pip install -U pandas

austin362667 avatar Mar 10 '24 10:03 austin362667

testing: Schema = Annotated[StructuredDataset, kwtypes({"Name": str}, {"Detail": {"Age": int, "Sex": str}})]

Screenshot 2024-03-10 at 6 07 52 PM Screenshot 2024-03-10 at 6 08 15 PM

austin362667 avatar Mar 10 '24 10:03 austin362667

testing: Schema = Annotated[StructuredDataset, kwtypes({"Detail": DetailField}, Name=str)]

Screenshot 2024-03-10 at 6 16 18 PM Screenshot 2024-03-10 at 6 16 26 PM

austin362667 avatar Mar 10 '24 10:03 austin362667

testing: Schema = Annotated[StructuredDataset, kwtypes({"Detail": DetailField},{"City": str},{"City": str}, Name=str)]

Screenshot 2024-03-10 at 6 18 33 PM Screenshot 2024-03-10 at 6 18 41 PM

austin362667 avatar Mar 10 '24 10:03 austin362667

@gitgraghu , @dylanwilder Please help me check with any unexpected behavior. Thank you~ cc @pingsutw

austin362667 avatar Mar 10 '24 10:03 austin362667

Codecov Report

All modified and coverable lines are covered by tests :white_check_mark:

Project coverage is 97.40%. Comparing base (bf38b8e) to head (facdeb8). Report is 43 commits behind head on master.

:exclamation: Current head facdeb8 differs from pull request most recent head a6e469a. Consider uploading reports for the commit a6e469a to get more accurate results

Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2252       +/-   ##
===========================================
+ Coverage   83.04%   97.40%   +14.36%     
===========================================
  Files         324        9      -315     
  Lines       24861      231    -24630     
  Branches     3547        0     -3547     
===========================================
- Hits        20645      225    -20420     
+ Misses       3591        6     -3585     
+ Partials      625        0      -625     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar Mar 11 '24 19:03 codecov[bot]

@austin362667 it doesn't work for me. Try below example,

from typing import Annotated

import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow, ImageSpec

data = {
    'company': 'XYZ pvt ltd',
    'location': 'London',
    'info': {
        'president': 'Rakesh Kapoor',
        'contacts': {
            'email': '[email protected]',
            'tel': '9876543210'
        }
    }
}


# MyDataset = Annotated[StructuredDataset, kwtypes(company=str)]
MyDataset = Annotated[StructuredDataset, kwtypes(info={"president": str})]

@task
def create_bq_table() -> StructuredDataset:
    df = pd.json_normalize(data, max_level=0)
    print("dataframe: \n", df)
    return StructuredDataset(dataframe=df)
    # return StructuredDataset(
    #     dataframe=df, uri="bq://dogfood-gcp-dataplane:dataset.nested_type"
    # )


@task
def print_table(sd: MyDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print(t)
    return t


@workflow
def wf():
    sd = create_bq_table()
    print_table(sd=sd)

pingsutw avatar Mar 13 '24 21:03 pingsutw

@austin362667 it doesn't work for me. Try below example,

@pingsutw Thank you for providing me useful use cases. These examples can now work in remote mode. And bigquery or pyarrow is still the tricky part.

austin362667 avatar Mar 14 '24 18:03 austin362667

  • Original DataFrame

      data = {
            'company': 'XYZ pvt ltd',
            'location': 'London',
            'info': {
                'president': 'Rakesh Kapoor',
                'contacts': {
                    'email': '[email protected]',
                    'tel': '9876543210'
                }
            }
        }
    
    Screenshot 2024-03-15 at 6 31 50 PM
  • By Dict Screenshot 2024-03-15 at 6 33 23 PM

  • By Dataclass Screenshot 2024-03-15 at 6 33 54 PM

austin362667 avatar Mar 15 '24 10:03 austin362667

@gitgraghu @dylanwilder Feel free to give it a try if it works in your GCP bq scenario. Thank you!

It worked when I passed it in my own Bucket and BigQuery with my GCP account, both in local and remote modes. However, I'm not 100% sure about other cases.

from typing import Annotated
from dataclasses import dataclass
import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow, ImageSpec

flytekit_dev_version = "https://github.com/austin362667/flytekit.git@90a19fc51d1b0eb77b020140810883a317432675"
image = ImageSpec(
    packages=[
        "pandas",
        "google-cloud-bigquery",
        "google-cloud-bigquery-storage",
        f"git+{flytekit_dev_version}",
        f"git+{flytekit_dev_version}#subdirectory=plugins/flytekit-bigquery",
    ],
    apt_packages=["git"],
    files=["./keys/gcp-service-account.json"],
    env={"GOOGLE_APPLICATION_CREDENTIALS": "./gcp-service-account.json"},
    platform="linux/arm64",
    registry="localhost:30000",
)

data = [{
    'company': 'XYZ pvt ltd',
    'location': 'London',
    'info': {
        'president': 'Rakesh Kapoor',
        'contacts': {
            'email': '[email protected]',
            'tel': '9876543210'
        }
    }
},
{
    'company': 'ABC pvt ltd',
    'location': 'USA',
    'info': {
        'president': 'Kapoor Rakesh',
        'contacts': {
            'email': '[email protected]',
            'tel': '0123456789'
        }
    }
}
]

@dataclass
class ContactsField():
    # email: str
    tel: str

@dataclass
class InfoField():
    # president: str
    contacts: ContactsField

@dataclass
class CompanyField():
    company: str
    # location: str
    # info: InfoField

# MyArgDataset = Annotated[StructuredDataset, kwtypes(company=str)]
# MyDictDataset = Annotated[StructuredDataset, kwtypes(info={"president": str})]
# MyDataClassDataset = Annotated[StructuredDataset, kwtypes(info=InfoField)]
# MyDataClassDataset = Annotated[StructuredDataset, kwtypes(CompanyField)]
# MyDictDataset = Annotated[StructuredDataset, kwtypes(info={"president": str, "contacts":{"email":str}})]
MyDictDataset = Annotated[StructuredDataset, kwtypes(info={"contacts":{"tel":str}})]
MyDataClassDataset = Annotated[StructuredDataset, kwtypes(info=kwtypes(contacts=ContactsField))]

@task(container_image=image)
def create_bq_table() -> StructuredDataset:
    df = pd.json_normalize(data, max_level=0)
    print("original dataframe: \n", df)
    # return StructuredDataset(dataframe=df)
    return StructuredDataset(
        dataframe=df,
        # uri="gs://flyte_austin362667_bucket/nested_types"
        uri= "bq://flyte-austin362667-gcp:dataset.nested_type"
    )

@task(container_image=image)
def print_table_by_dict(sd: MyDictDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyDictDataset dataframe: \n", t)
    return t

@task(container_image=image)
def print_table_by_dataclass(sd: MyDataClassDataset) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("MyDataClassDataset dataframe: \n", t)
    return t

@workflow
def wf():
    sd = create_bq_table()
    print_table_by_dict(sd=sd)
    print_table_by_dataclass(sd=sd)

austin362667 avatar Mar 16 '24 12:03 austin362667

From slack discussion

@dataclass
class Levels():
    # level1: str
    level2: str
Schema = Annotated[StructuredDataset, kwtypes(age=int, levels=Levels)]

@task(container_image=image)
def mytask_w() -> StructuredDataset:
    df = pd.DataFrame({
        "age": [1, 2],
        "levels": [
            {"level1": "1", "level2": "2"},
            {"level1": "2", "level2": "4"}
        ]
    })
    return StructuredDataset(dataframe=df)

@task(container_image=image)
def mytask_r(sd: Schema) -> pd.DataFrame:
    t = sd.open(pd.DataFrame).all()
    print("dataframe: \n", t)
    return t


@workflow
def mywf():
    sd = mytask_w()
    mytask_r(sd=sd)
  1. Local Screenshot 2024-03-17 at 3 41 44 AM

  2. Remote Screenshot 2024-03-17 at 3 42 01 AM

Screenshot 2024-03-17 at 3 42 34 AM Screenshot 2024-03-17 at 3 42 52 AM

austin362667 avatar Mar 16 '24 19:03 austin362667

  1. Add files field for ImageSpec() in order to copy gcp_keys.json file into container.
    • Fix related test case: tag hash changed at ./tests/flytekit/unit/cli/pyflyte/test _run.py
      • CI failed, to reproduce, pytest ./tests/flytekit/unit/cli/pyflyte/test_run.py -vv

austin362667 avatar Mar 19 '24 11:03 austin362667

@pingsutw Should we leverage FlyteFile mentioned by @Future-Outlier so that avoiding modify ImageSpec() fields by adding files propriety.

austin362667 avatar Mar 19 '24 12:03 austin362667

Or separate ImageSepc() modification as another PR (merge it first due to dependency).

austin362667 avatar Mar 19 '24 12:03 austin362667