flytekit
flytekit copied to clipboard
vaex structured dataset and native types implementation
Vaex has great performance on a single machine, which is usually needed for most datasets. This PR adds support for Vaex as a pandas alternative for StructuredDataset object type. We extend StructuredDatasetDecoder and StructuredDatasetEncoder for vaex as in https://docs.flyte.org/projects/cookbook/en/latest/auto/core/type_system/structured_dataset.html
This PR implements automatic serialization and deserialization between consecutive tasks using parquet but could be extended to Arrow and HDF5 or the other binary formats supported by vaex https://vaex.readthedocs.io/en/latest/guides/io.html
Type
- [ ] Bug Fix
- [x] Feature
- [x] Plugin
Are all requirements met?
- [x] Code completed
- [ ] Smoke tested
- [x] Unit tests added
- [x] Code documentation added
- [ ] Any pending items have an associated Issue
Complete description
Added support for Vaex Dataframe as a type Vaex Structured Dataset Encode and Decoder for serialisation and deserialisation
Tracking Issue
Fixes https://github.com/flyteorg/flyte/issues/701
Follow-up issue
NA
OR
https://github.com/flyteorg/flyte/issues/
Thank you for opening this pull request! π
These tips will help get your PR across the finish line:
- Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
- Sign off your commits (Reference: DCO Guide).
@ryankarlos, thanks for creating the PR! We'll review it shortly. :)
@samhita-alla thanks - im new to flyte so quite possible i must have missed out a few things.
@pingsutw Thanks, i have now added a plugin for vaex.
However, when i am trying to this works by running a simple workflow locally , i get an error - and not sure how to fix it
StructuredDatasetTransformerEngine.register(VaexDataFrameToParquetEncodingHandlers())
StructuredDatasetTransformerEngine.register(ParquetToVaxDataFrameDecodingHandler())
StructuredDatasetTransformerEngine.register_renderer(vaex.DataFrame, VaexDataFrameRenderer())
subset_schema = Annotated[StructuredDataset, kwtypes(col2=str), PARQUET]
@task
def generate() -> subset_schema:
pd_df = pd.DataFrame({"col1": [1, 3, 2], "col2": list("abc")})
vaex_df = vaex.from_pandas(pd_df)
return StructuredDataset(dataframe=vaex_df)
@task
def consume(df: subset_schema) -> subset_schema:
df = df.open(vaex.DataFrame).all()
assert df["col2"][0] == "a"
assert df["col2"][1] == "b"
assert df["col2"][2] == "c"
return StructuredDataset(dataframe=df)
@workflow
def wf():
consume(df=generate())
if __name__ == "__main__":
wf()
I have already registered and encoding and decoding handlers so not sure why it is complaning
TypeError: Failed to convert return value for var o0 for function generate with error
<class 'ValueError'>: Failed to find a handler for <class 'vaex.dataframe.DataFrameLocal'>,
protocol file, fmt parquet
Codecov Report
Merging #1230 (9d174f4) into master (63ad4fc) will increase coverage by
0.07%
. The diff coverage isn/a
.
@@ Coverage Diff @@
## master #1230 +/- ##
==========================================
+ Coverage 68.57% 68.65% +0.07%
==========================================
Files 288 288
Lines 26224 26351 +127
Branches 2929 2489 -440
==========================================
+ Hits 17984 18092 +108
- Misses 7762 7779 +17
- Partials 478 480 +2
Impacted Files | Coverage Ξ | |
---|---|---|
flytekit/deck/deck.py | 34.04% <0.00%> (-4.26%) |
:arrow_down: |
flytekit/clis/sdk_in_container/register.py | 79.68% <0.00%> (-3.08%) |
:arrow_down: |
flytekit/types/structured/structured_dataset.py | 60.74% <0.00%> (-2.58%) |
:arrow_down: |
flytekit/types/directory/types.py | 54.16% <0.00%> (-0.84%) |
:arrow_down: |
...ctured_dataset/test_structured_dataset_workflow.py | 99.24% <0.00%> (-0.76%) |
:arrow_down: |
flytekit/core/type_engine.py | 58.89% <0.00%> (-0.50%) |
:arrow_down: |
flytekit/core/local_cache.py | 46.66% <0.00%> (-0.40%) |
:arrow_down: |
flytekit/clis/sdk_in_container/helpers.py | 92.59% <0.00%> (-0.27%) |
:arrow_down: |
flytekit/clis/sdk_in_container/run.py | 84.15% <0.00%> (-0.04%) |
:arrow_down: |
plugins/setup.py | 0.00% <0.00%> (ΓΈ) |
|
... and 22 more |
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.
@ryankarlos, it seems like Vaex dataframe type is vaex.dataframe.DataFrameLocal
rather than vaex.dataframe
. Your transformer handles the latter use case not the former. Can you re-verify what the type of Vaex dataframe is?
@ryankarlos, it seems like Vaex dataframe type is
vaex.dataframe.DataFrameLocal
rather thanvaex.dataframe
. Your transformer handles the latter use case not the former. Can you re-verify what the type of Vaex dataframe is?
Ah yes, thanks - ive fixed it now.
Thank you @ryankarlos. LGTM
@samhita-alla pushed requested changes now.
@pingsutw, +1 again, please?
Congrats on merging your first pull request! π