Annotated StructuredDataset: support `nested_types`
Tracking issue
https://github.com/flyteorg/flyte/issues/4241
Why are the changes needed?
Currently StructuredDatasets only support flat schemas.
This PR aims to support nested types as form of dict/json, dataclass, named args/kwargs.
What changes were proposed in this pull request?
-
a
flatten_dict()tool function instructured_dataset.py- Must use
kwtypes()to pass in types. Check comments.
- Must use
-
After we get a list of
SUPPORTED_TYPES, we select them by series of key joined by..- how to flat? e.g.,
{ "a": { "b": { "c": { "d": "vvv" } } }, "e.f": "www", }to
{'a.b.c.d': 'vvv', 'e.f': 'www'}- Can not select by the sub keys. like
"c.d".
How was this patch tested?
please take a look at screenshots and examples below.
Setup process
from typing import Annotated
from dataclasses import dataclass
import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow, ImageSpec
# flytekit_dev_version = "https://github.com/austin362667/flytekit.git@f5cd70dd053e6f3d4aaf5b90d9c4b28f32c0980a"
# image = ImageSpec(
# packages=[
# "pandas",
# "google-cloud-bigquery",
# "google-cloud-bigquery-storage",
# f"git+{flytekit_dev_version}",
# f"git+{flytekit_dev_version}#subdirectory=plugins/flytekit-bigquery",
# ],
# apt_packages=["git"],
# source_root="./keys",
# env={"GOOGLE_APPLICATION_CREDENTIALS": "./gcp-service-account.json"},
# platform="linux/arm64",
# registry="localhost:30000",
# )
## Case 1.
data = [
{
'company': 'XYZ pvt ltd',
'location': 'London',
'info': {
'president': 'Rakesh Kapoor',
'contacts': {
'email': '[email protected]',
'tel': '9876543210'
}
}
},
{
'company': 'ABC pvt ltd',
'location': 'USA',
'info': {
'president': 'Kapoor Rakesh',
'contacts': {
'email': '[email protected]',
'tel': '0123456789'
}
}
}
]
@dataclass
class ContactsField():
email: str
tel: str
@dataclass
class InfoField():
president: str
contacts: ContactsField
@dataclass
class CompanyField():
location: str
info: InfoField
company: str
MyArgDataset = Annotated[StructuredDataset, kwtypes(company = str)]
MyDictDataset = Annotated[StructuredDataset, kwtypes(info = {"contacts": {"tel": str}})]
MyDictListDataset = Annotated[StructuredDataset, kwtypes(info = {"contacts": {"tel": str, "email": str}})]
MyTopDataClassDataset = Annotated[StructuredDataset, kwtypes( CompanyField )]
MySecondDataClassDataset = Annotated[StructuredDataset, kwtypes(info = InfoField)]
MyNestedDataClassDataset = Annotated[StructuredDataset, kwtypes(info = kwtypes(contacts = ContactsField))]
@task()
def create_bq_table() -> StructuredDataset:
df = pd.json_normalize(data, max_level=0)
print("original dataframe: \n", df)
# Enable one of GCP uri below if you want!
return StructuredDataset(
dataframe=df,
# uri= "gs://flyte_austin362667_bucket/nested_types"
# uri= "bq://flyte-austin362667-gcp:dataset.nested_type"
)
@task()
def print_table_by_arg(sd: MyArgDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyArgDataset dataframe: \n", t)
return t
@task()
def print_table_by_dict(sd: MyDictDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyDictDataset dataframe: \n", t)
return t
@task()
def print_table_by_list_dict(sd: MyDictListDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyDictListDataset dataframe: \n", t)
return t
@task()
def print_table_by_top_dataclass(sd: MyTopDataClassDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyTopDataClassDataset dataframe: \n", t)
return t
@task()
def print_table_by_second_dataclass(sd: MySecondDataClassDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MySecondDataClassDataset dataframe: \n", t)
return t
@task()
def print_table_by_nested_dataclass(sd: MyNestedDataClassDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyNestedDataClassDataset dataframe: \n", t)
return t
@workflow
def contacts_wf():
sd = create_bq_table()
print_table_by_arg(sd=sd)
print_table_by_dict(sd=sd)
print_table_by_list_dict(sd=sd)
print_table_by_top_dataclass(sd=sd)
print_table_by_second_dataclass(sd=sd)
print_table_by_nested_dataclass(sd=sd)
return
## Case 2.
@dataclass
class Levels():
# level1: str
level2: str
Schema = Annotated[StructuredDataset, kwtypes(age=int, levels=Levels)]
@task()
def mytask_w() -> StructuredDataset:
df = pd.DataFrame({
"age": [1, 2],
"levels": [
{"level1": "1", "level2": "2"},
{"level1": "2", "level2": "4"}
]
})
return StructuredDataset(dataframe=df)
# Should only show level2 string..
@task()
def mytask_r(sd: Schema) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("dataframe: \n", t)
return t
@workflow
def levels_wf():
sd = mytask_w()
mytask_r(sd=sd)
Screenshots
Check all the applicable boxes
- [ ] I updated the documentation accordingly.
- [x] All new and existing tests passed.
- [x] All commits are signed-off.
Related PRs
Docs link
from dataclasses import dataclass
from flytekit import task, workflow, kwtypes
from flytekit.types.structured import StructuredDataset
from typing_extensions import Annotated
from flytekit import ImageSpec, task
import pandas as pd
# custom_image = ImageSpec(
# name="basic:latest",
# registry="localhost:30000",
# packages=["pandas", "numpy"],
# apt_packages=['git'],
# python_version=3.11,
# )
# if custom_image.is_container():
# import pandas as pd
@dataclass
class DetailField():
age: int
sex: str
@dataclass
class RecordField():
name: str
detail: DetailField
# city: str
Schema = Annotated[StructuredDataset, kwtypes({"Name": str}, {"Detail": {"Age": int, "Sex": str}})]
# Schema = Annotated[StructuredDataset, kwtypes({"Detail": DetailField}, Name=str)]
# Schema = Annotated[StructuredDataset, kwtypes(Name=str, Detail={"Age": int, "Sex": str})]
# Schema = Annotated[StructuredDataset, kwtypes({"Detail": {"Age": int, "Sex": str}})]
# Schema = Annotated[StructuredDataset, kwtypes(Detail=DetailField)]
# Schema = Annotated[StructuredDataset, kwtypes(Name=str)]
# Schema = Annotated[StructuredDataset, kwtypes({"Detail": DetailField},{"City": str},{"City": str}, Name=str)]
@task
def write_df() -> Schema:
df = pd.DataFrame({"Name": ["Tom", "Joseph", "Alyssa"], "Detail": [{"Age":20, "Sex": "M"}, {"Age":22, "Sex": "M"}, {"Age":24, "Sex": "F"}], "City": ["Madrid","Paris","London"]})
return StructuredDataset(dataframe=df)
@task
def read_df(structds: Schema):
df = structds.open(pd.DataFrame).all()
print(df)
@workflow
def struct_record_wf():
out = write_df()
read_df(structds=out)
if __name__ == "__main__":
struct_record_wf()
FROM python:3.9-slim-buster
USER root
WORKDIR /root
ENV PYTHONPATH /root
RUN apt-get update && apt-get install build-essential -y
RUN apt-get install git -y
RUN pip install -U git+https://github.com/austin362667/flytekit.git@eb4e2c2d25ed57a138a8640fc7f059eebf393241
RUN pip install -U pandas
testing: Schema = Annotated[StructuredDataset, kwtypes({"Name": str}, {"Detail": {"Age": int, "Sex": str}})]
testing: Schema = Annotated[StructuredDataset, kwtypes({"Detail": DetailField}, Name=str)]
testing: Schema = Annotated[StructuredDataset, kwtypes({"Detail": DetailField},{"City": str},{"City": str}, Name=str)]
@gitgraghu , @dylanwilder Please help me check with any unexpected behavior. Thank you~ cc @pingsutw
Codecov Report
All modified and coverable lines are covered by tests :white_check_mark:
Project coverage is 97.40%. Comparing base (
bf38b8e) to head (facdeb8). Report is 43 commits behind head on master.
:exclamation: Current head facdeb8 differs from pull request most recent head a6e469a. Consider uploading reports for the commit a6e469a to get more accurate results
Additional details and impacted files
@@ Coverage Diff @@
## master #2252 +/- ##
===========================================
+ Coverage 83.04% 97.40% +14.36%
===========================================
Files 324 9 -315
Lines 24861 231 -24630
Branches 3547 0 -3547
===========================================
- Hits 20645 225 -20420
+ Misses 3591 6 -3585
+ Partials 625 0 -625
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
@austin362667 it doesn't work for me. Try below example,
from typing import Annotated
import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow, ImageSpec
data = {
'company': 'XYZ pvt ltd',
'location': 'London',
'info': {
'president': 'Rakesh Kapoor',
'contacts': {
'email': '[email protected]',
'tel': '9876543210'
}
}
}
# MyDataset = Annotated[StructuredDataset, kwtypes(company=str)]
MyDataset = Annotated[StructuredDataset, kwtypes(info={"president": str})]
@task
def create_bq_table() -> StructuredDataset:
df = pd.json_normalize(data, max_level=0)
print("dataframe: \n", df)
return StructuredDataset(dataframe=df)
# return StructuredDataset(
# dataframe=df, uri="bq://dogfood-gcp-dataplane:dataset.nested_type"
# )
@task
def print_table(sd: MyDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print(t)
return t
@workflow
def wf():
sd = create_bq_table()
print_table(sd=sd)
@austin362667 it doesn't work for me. Try below example,
@pingsutw Thank you for providing me useful use cases.
These examples can now work in remote mode.
And bigquery or pyarrow is still the tricky part.
-
Original DataFrame
data = { 'company': 'XYZ pvt ltd', 'location': 'London', 'info': { 'president': 'Rakesh Kapoor', 'contacts': { 'email': '[email protected]', 'tel': '9876543210' } } } -
By Dict
-
By Dataclass
@gitgraghu @dylanwilder
Feel free to give it a try if it works in your GCP bq scenario. Thank you!
It worked when I passed it in my own Bucket and BigQuery with my GCP account, both in local and remote modes. However, I'm not 100% sure about other cases.
from typing import Annotated
from dataclasses import dataclass
import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow, ImageSpec
flytekit_dev_version = "https://github.com/austin362667/flytekit.git@90a19fc51d1b0eb77b020140810883a317432675"
image = ImageSpec(
packages=[
"pandas",
"google-cloud-bigquery",
"google-cloud-bigquery-storage",
f"git+{flytekit_dev_version}",
f"git+{flytekit_dev_version}#subdirectory=plugins/flytekit-bigquery",
],
apt_packages=["git"],
files=["./keys/gcp-service-account.json"],
env={"GOOGLE_APPLICATION_CREDENTIALS": "./gcp-service-account.json"},
platform="linux/arm64",
registry="localhost:30000",
)
data = [{
'company': 'XYZ pvt ltd',
'location': 'London',
'info': {
'president': 'Rakesh Kapoor',
'contacts': {
'email': '[email protected]',
'tel': '9876543210'
}
}
},
{
'company': 'ABC pvt ltd',
'location': 'USA',
'info': {
'president': 'Kapoor Rakesh',
'contacts': {
'email': '[email protected]',
'tel': '0123456789'
}
}
}
]
@dataclass
class ContactsField():
# email: str
tel: str
@dataclass
class InfoField():
# president: str
contacts: ContactsField
@dataclass
class CompanyField():
company: str
# location: str
# info: InfoField
# MyArgDataset = Annotated[StructuredDataset, kwtypes(company=str)]
# MyDictDataset = Annotated[StructuredDataset, kwtypes(info={"president": str})]
# MyDataClassDataset = Annotated[StructuredDataset, kwtypes(info=InfoField)]
# MyDataClassDataset = Annotated[StructuredDataset, kwtypes(CompanyField)]
# MyDictDataset = Annotated[StructuredDataset, kwtypes(info={"president": str, "contacts":{"email":str}})]
MyDictDataset = Annotated[StructuredDataset, kwtypes(info={"contacts":{"tel":str}})]
MyDataClassDataset = Annotated[StructuredDataset, kwtypes(info=kwtypes(contacts=ContactsField))]
@task(container_image=image)
def create_bq_table() -> StructuredDataset:
df = pd.json_normalize(data, max_level=0)
print("original dataframe: \n", df)
# return StructuredDataset(dataframe=df)
return StructuredDataset(
dataframe=df,
# uri="gs://flyte_austin362667_bucket/nested_types"
uri= "bq://flyte-austin362667-gcp:dataset.nested_type"
)
@task(container_image=image)
def print_table_by_dict(sd: MyDictDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyDictDataset dataframe: \n", t)
return t
@task(container_image=image)
def print_table_by_dataclass(sd: MyDataClassDataset) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("MyDataClassDataset dataframe: \n", t)
return t
@workflow
def wf():
sd = create_bq_table()
print_table_by_dict(sd=sd)
print_table_by_dataclass(sd=sd)
From slack discussion
@dataclass
class Levels():
# level1: str
level2: str
Schema = Annotated[StructuredDataset, kwtypes(age=int, levels=Levels)]
@task(container_image=image)
def mytask_w() -> StructuredDataset:
df = pd.DataFrame({
"age": [1, 2],
"levels": [
{"level1": "1", "level2": "2"},
{"level1": "2", "level2": "4"}
]
})
return StructuredDataset(dataframe=df)
@task(container_image=image)
def mytask_r(sd: Schema) -> pd.DataFrame:
t = sd.open(pd.DataFrame).all()
print("dataframe: \n", t)
return t
@workflow
def mywf():
sd = mytask_w()
mytask_r(sd=sd)
-
Local
-
Remote
- Add
filesfield forImageSpec()in order to copygcp_keys.jsonfile into container.- Fix related test case: tag hash changed at
./tests/flytekit/unit/cli/pyflyte/test _run.py- CI failed, to reproduce,
pytest ./tests/flytekit/unit/cli/pyflyte/test_run.py -vv
- CI failed, to reproduce,
- Fix related test case: tag hash changed at
@pingsutw Should we leverage FlyteFile mentioned by @Future-Outlier so that avoiding modify ImageSpec() fields by adding files propriety.
Or separate ImageSepc() modification as another PR (merge it first due to dependency).