kedro-plugins icon indicating copy to clipboard operation
kedro-plugins copied to clipboard

polars.LazyPolarsdataset .collect() streaming

Open butterlyn opened this issue 1 year ago • 4 comments

Description

Enable Polars streaming by default when saving polars.LazyPolarsDataset

Context

Enables larger-than-memory data processing, one of the main advantages of using Polars LazyFrames.

Possible Implementation

.collect(streaming=True) in polars.LazyPolarsDataset

If streaming cannot be performed for whatever reason, Polars disables streaming automatically at runtime, so having streaming as the default behaviour should be okay.

Possible Alternatives

Add a flag to enable/disable streaming through data catalog load_args. However, this may be problematic given streaming is not an argument of LazyFrame.sink_csv(), but rather an argument of LazyFrame.collect().

butterlyn avatar Jan 21 '24 06:01 butterlyn

Thanks @butterlyn for this feature request!

I'm going to suggest a third alternative, which is adding a dataset-level property, like this:

ds:
    type: polars.LazyPolarsDataset
    streaming: true

how does that sound?

astrojuanlu avatar Jan 24 '24 11:01 astrojuanlu

@astrojuanlu Love the idea! That'd be perfect

butterlyn avatar Jan 24 '24 11:01 butterlyn

This one is actually easy I'd say :) It requires adding a new argument to the initialiser:

https://github.com/kedro-org/kedro-plugins/blob/a88ad7f4fef9ad410eefa4cc178f8ed4e645bed7/kedro-datasets/kedro_datasets/polars/lazy_polars_dataset.py#L76-L81

And then storing it in an internal property, and using it where appropriate.

astrojuanlu avatar Jan 24 '24 12:01 astrojuanlu

In fact, I'm thinking - rather than using .collect() and then .write_*, shouldn't we use .sink_ directly? cc @cpinon-grd (comes from https://linen-slack.kedro.org/t/16374083/hey-team-is-there-any-way-to-store-a-lazypolarsdataframe-wit#76e13870-de5a-4a1d-86c3-f0c30f2ebf25)

astrojuanlu avatar Feb 06 '24 08:02 astrojuanlu

As per my comment here, I wouldn't recommend using streaming or sink_* methods. Even when using .collect(streaming=True), it is explicitly mentioned in the docs that streaming mode is considered unstable.

MatthiasRoels avatar Jun 27 '24 17:06 MatthiasRoels

Streaming functionality is indeed considered unstable https://github.com/pola-rs/polars/pull/13948

But as far as I understand, sink_* methods in non-streaming mode are okay?

astrojuanlu avatar Jun 27 '24 20:06 astrojuanlu

Let's close this issue in favour of #702, therefore no streaming=True but let's continue the discussion on using the lazy methods for LazyPolarsDataset.

astrojuanlu avatar Jun 27 '24 20:06 astrojuanlu

Hey! If I'm not wrong, processing larger than memory datasets is one of the key features of Polars. Polars docs state:

With the lazy API Polars doesn't run each query line-by-line but instead processes the full query end-to-end. To get the most out of Polars it is important that you use the lazy API because:

  • the lazy API allows Polars to apply automatic query optimization with the query optimizer
  • the lazy API allows you to work with larger than memory datasets using streaming
  • the lazy API can catch schema errors before processing the data

Isn't it a bit weird that in order to "get the most out of Polars", the Polars team recommends an unstable solution? If using streaming mode is unstable, what is the "recommended"/"your go to" solution?

cpinon-grd avatar Jun 28 '24 07:06 cpinon-grd

If you use the Lazy API, you already get some optimisations such as predicate and filter pushdown. This means that you only read the rows/columns in memory that you need (as opposed to the full dataset).

MatthiasRoels avatar Jun 28 '24 07:06 MatthiasRoels