flytekit icon indicating copy to clipboard operation
flytekit copied to clipboard

vaex structured dataset and native types implementation

Open ryankarlos opened this issue 2 years ago β€’ 7 comments

Vaex has great performance on a single machine, which is usually needed for most datasets. This PR adds support for Vaex as a pandas alternative for StructuredDataset object type. We extend StructuredDatasetDecoder and StructuredDatasetEncoder for vaex as in https://docs.flyte.org/projects/cookbook/en/latest/auto/core/type_system/structured_dataset.html

This PR implements automatic serialization and deserialization between consecutive tasks using parquet but could be extended to Arrow and HDF5 or the other binary formats supported by vaex https://vaex.readthedocs.io/en/latest/guides/io.html

Type

  • [ ] Bug Fix
  • [x] Feature
  • [x] Plugin

Are all requirements met?

  • [x] Code completed
  • [ ] Smoke tested
  • [x] Unit tests added
  • [x] Code documentation added
  • [ ] Any pending items have an associated Issue

Complete description

Added support for Vaex Dataframe as a type Vaex Structured Dataset Encode and Decoder for serialisation and deserialisation

Tracking Issue

Fixes https://github.com/flyteorg/flyte/issues/701

Follow-up issue

NA OR https://github.com/flyteorg/flyte/issues/

ryankarlos avatar Oct 09 '22 23:10 ryankarlos

Thank you for opening this pull request! πŸ™Œ

These tips will help get your PR across the finish line:

  • Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
  • Sign off your commits (Reference: DCO Guide).

welcome[bot] avatar Oct 09 '22 23:10 welcome[bot]

@ryankarlos, thanks for creating the PR! We'll review it shortly. :)

samhita-alla avatar Oct 10 '22 13:10 samhita-alla

@samhita-alla thanks - im new to flyte so quite possible i must have missed out a few things.

ryankarlos avatar Oct 10 '22 23:10 ryankarlos

@pingsutw Thanks, i have now added a plugin for vaex.

However, when i am trying to this works by running a simple workflow locally , i get an error - and not sure how to fix it

StructuredDatasetTransformerEngine.register(VaexDataFrameToParquetEncodingHandlers())
StructuredDatasetTransformerEngine.register(ParquetToVaxDataFrameDecodingHandler())
StructuredDatasetTransformerEngine.register_renderer(vaex.DataFrame, VaexDataFrameRenderer())

subset_schema = Annotated[StructuredDataset, kwtypes(col2=str), PARQUET]

@task
def generate() -> subset_schema:
    pd_df = pd.DataFrame({"col1": [1, 3, 2], "col2": list("abc")})
    vaex_df = vaex.from_pandas(pd_df)
    return StructuredDataset(dataframe=vaex_df)

@task
def consume(df: subset_schema) -> subset_schema:
    df = df.open(vaex.DataFrame).all()
    assert df["col2"][0] == "a"
    assert df["col2"][1] == "b"
    assert df["col2"][2] == "c"
    return StructuredDataset(dataframe=df)

@workflow
def wf():
    consume(df=generate())

if __name__ == "__main__":
    wf()

I have already registered and encoding and decoding handlers so not sure why it is complaning

TypeError: Failed to convert return value for var o0 for function generate with error 
<class 'ValueError'>: Failed to find a handler for <class 'vaex.dataframe.DataFrameLocal'>, 
protocol file, fmt parquet

ryankarlos avatar Oct 15 '22 04:10 ryankarlos

Codecov Report

Merging #1230 (9d174f4) into master (63ad4fc) will increase coverage by 0.07%. The diff coverage is n/a.

@@            Coverage Diff             @@
##           master    #1230      +/-   ##
==========================================
+ Coverage   68.57%   68.65%   +0.07%     
==========================================
  Files         288      288              
  Lines       26224    26351     +127     
  Branches     2929     2489     -440     
==========================================
+ Hits        17984    18092     +108     
- Misses       7762     7779      +17     
- Partials      478      480       +2     
Impacted Files Coverage Ξ”
flytekit/deck/deck.py 34.04% <0.00%> (-4.26%) :arrow_down:
flytekit/clis/sdk_in_container/register.py 79.68% <0.00%> (-3.08%) :arrow_down:
flytekit/types/structured/structured_dataset.py 60.74% <0.00%> (-2.58%) :arrow_down:
flytekit/types/directory/types.py 54.16% <0.00%> (-0.84%) :arrow_down:
...ctured_dataset/test_structured_dataset_workflow.py 99.24% <0.00%> (-0.76%) :arrow_down:
flytekit/core/type_engine.py 58.89% <0.00%> (-0.50%) :arrow_down:
flytekit/core/local_cache.py 46.66% <0.00%> (-0.40%) :arrow_down:
flytekit/clis/sdk_in_container/helpers.py 92.59% <0.00%> (-0.27%) :arrow_down:
flytekit/clis/sdk_in_container/run.py 84.15% <0.00%> (-0.04%) :arrow_down:
plugins/setup.py 0.00% <0.00%> (ΓΈ)
... and 22 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

codecov[bot] avatar Oct 16 '22 05:10 codecov[bot]

@ryankarlos, it seems like Vaex dataframe type is vaex.dataframe.DataFrameLocal rather than vaex.dataframe. Your transformer handles the latter use case not the former. Can you re-verify what the type of Vaex dataframe is?

samhita-alla avatar Oct 16 '22 06:10 samhita-alla

@ryankarlos, it seems like Vaex dataframe type is vaex.dataframe.DataFrameLocal rather than vaex.dataframe. Your transformer handles the latter use case not the former. Can you re-verify what the type of Vaex dataframe is?

Ah yes, thanks - ive fixed it now.

ryankarlos avatar Oct 18 '22 12:10 ryankarlos

Thank you @ryankarlos. LGTM

pingsutw avatar Oct 19 '22 18:10 pingsutw

@samhita-alla pushed requested changes now.

ryankarlos avatar Oct 24 '22 11:10 ryankarlos

@pingsutw, +1 again, please?

samhita-alla avatar Oct 25 '22 12:10 samhita-alla

Congrats on merging your first pull request! πŸŽ‰

welcome[bot] avatar Oct 28 '22 04:10 welcome[bot]