polars
polars copied to clipboard
Out of Core Processing
This might be a complex issue to tackle, but are there any plans for out of core processing? These days it is fairly common to deal with huge larger-than-memory datasets, so it would be very nice to have a faster alternative to the likes of Dask.
This is what Spark is for. Scope and feature creep is a sure way to throw a project off the track. Might be for polars-2.0 😄
See #3379. Polars itself will not do OOC processing.
We might produce distribution plans that could be executed by polars on ray or dask.
It may be useful to contribute a from_polars
function in ray
It may be useful to contribute a
from_polars
function in ray
The link is broken for me. Sharing the link that works form me
I stumbled on Polars when looking for an alternative for dask.dataframe. Although, according to this thread, it's not quite what I was looking for I'm massively impressed by the performance and the clean API. Because of the latter alone I wish I could ditch all my pandas/dask code and never look back :)
I'd like to share my thoughts on slonik's comment that "This is what Spark is for." I would say there is still a gap in the current python datascience stack when people want to process data on a single node when the data is too large for memory and/or they want to utilise all cores on their machine. In this case people don't necessarily want to go to the trouble of setting up and managing a Spark cluster. (At my company we're in exactly this situation on several projects.)
polars actually solves 50% of the problem since it is much better at utilising all available cores. So, for "local" operations (where you only need one partition at a time) using polars with a simple for loop over the partitions actually does the trick. In my experience the operations which are feasible on a single node are either local or 'almost local'. With the latter I mean things like a join between two datasets on a column that both datasets are partitioned by. If the boundaries of the partitions are known you then don't need to iterate over all possible pairs of partitions.
So, I'm wondering if it would be enough to have a small set of helper functions/classes (outside of polars.DataFrame, maybe outside of polars entirely) that make it easier to deal with partitioned data (keeping track of bounds, doing filtering or joins) would be enough to close the gap.
Apologies for the rambling post, but I'd be interested in hearing your opinions.
@mwiebusch78 this thread has not aged well as polars currently partially supports out of core operations and will try to expand this functionality to more operations.
To use polars on larger than ram datasets try collect(streaming=True)
. There are quite some operations we can do ooc already.
Ah, that's great to know. How is collect(streaming=True)
supposed to work? Do you get an iterator over frames instead of a single frame? I've tried this:
import polars as pl
data = pl.scan_parquet('data/*.parquet')
print(scores.collect(streaming=True))
but I'm getting this weird error:
thread '<unnamed>' panicked at 'called `Option::unwrap()` on a `None` value', /Users/runner/work/polars/polars/polars/polars-utils/src/arena.rs:80:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/Users/wiebuschm/Projects/ml-fraud/notebooks/scratch2.py", line 4, in <module>
print(data.collect(streaming=True))
File "/Users/wiebuschm/.pyenv/versions/general/lib/python3.9/site-packages/polars/utils.py", line 310, in wrapper
return fn(*args, **kwargs)
File "/Users/wiebuschm/.pyenv/versions/general/lib/python3.9/site-packages/polars/internals/lazyframe/frame.py", line 1164, in collect
return pli.wrap_df(ldf.collect())
pyo3_runtime.PanicException: called `Option::unwrap()` on a `None` value
That's on polars 0.15.2
I'm also curious what your plans are for the kinds of joins that I described. (Where both datasets are partitioned by one or more of the columns that you join on.) Are you currently doing any optimisations for those situations? If not, are you planning to do so?
@martin-wiebusch-thg I have similar use cases as you described, and found DuckDB to be an interesting option. It can to join across partitioned parquet datasets, use all the cores, and I believe does out-of-core processing.
Ah, that's great to know. How is
collect(streaming=True)
supposed to work? Do you get an iterator over frames instead of a single frame? I've tried this:import polars as pl data = pl.scan_parquet('data/*.parquet') print(scores.collect(streaming=True))
but I'm getting this weird error:
thread '<unnamed>' panicked at 'called `Option::unwrap()` on a `None` value', /Users/runner/work/polars/polars/polars/polars-utils/src/arena.rs:80:9 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace Traceback (most recent call last): File "/Users/wiebuschm/Projects/ml-fraud/notebooks/scratch2.py", line 4, in <module> print(data.collect(streaming=True)) File "/Users/wiebuschm/.pyenv/versions/general/lib/python3.9/site-packages/polars/utils.py", line 310, in wrapper return fn(*args, **kwargs) File "/Users/wiebuschm/.pyenv/versions/general/lib/python3.9/site-packages/polars/internals/lazyframe/frame.py", line 1164, in collect return pli.wrap_df(ldf.collect()) pyo3_runtime.PanicException: called `Option::unwrap()` on a `None` value
That's on polars 0.15.2
This is a bug. But typically you want to reduce your collect size by a limit, group by or filter as the result may not fit in memory.
@unidesigner polars also can do these operations on all cores and out of core
Sorry if I'm being a bit slow here, but what's the difference between .collect(streaming=True)
and .collect(streaming=False)
if all the data gets pulled into memory in both cases?
And how would you handle situations where the result is too big for memory but you just want to write the result to disk one partition at a time? LazyDataFrame
doesn't seem to have any write methods.
Sorry if I'm being a bit slow here, but what's the difference between .collect(streaming=True) and .collect(streaming=False) if all the data gets pulled into memory in both cases?
Yeah :/ I don't understand streaming
in the collect method either. Surely if you specify streaming, collect
must return a generator/iterator?
%memit pq = pl.scan_parquet("./polars.parquet").collect(streaming=True)
peak memory: 2391.88 MiB, increment: 2302.32 MiB
^ It's just loaded the whole 2GiB thing into RAM. Shouldn't we be specifying a chunk size or something in which to stream out? I dunno, "Run parts of the query in a streaming fashion" - this doesn't illuminate a whole lot to me. Any further info would be appreciated!
FYI, I have created a small helper package in Python which uses polars to handle partitioned data. It handles the (re)partitioning of the data and lets you slice and join datasets on the columns by which the data is partitioned (without loading the full data into memory, of course). I'm trying to keep it light-weight, so there is no attempt to re-implement the polars API (in the way dask.dataframe does it for pandas). Data-local operations (i.e. operations that only need one partition at a time) are supported by letting the user map an arbitrary function over the partitions (like Dask's map_partitions). The package is on PyPI (https://pypi.org/project/padawan/) and there's (basic) documentation on https://padawan.readthedocs.io/en/latest/.
^ It's just loaded the whole 2GiB thing into RAM. Shouldn't we be specifying a chunk size or something in which to stream out? I dunno, "Run parts of the query in a streaming fashion" - this doesn't illuminate a whole lot to me. Any further info would be appreciated!
You collect to memory. Use sink_parquet
to stream to disk.
Sorry if I'm being a bit slow here, but what's the difference between .collect(streaming=True) and .collect(streaming=False) if all the data gets pulled into memory in both cases?
Disclaimer: I'm not really familiar with the internal design with pola-rs
, so the answer might not be correct.
In both API, the final result is collected in-memory, however the difference is in the intermediate result during query execution. If your intermediate result is larger than the final result, then you'll see a difference in streaming execution.
A simple example would be:
lazyframe.filter(...).map(...).collect(streaming=True)
should use less memory than
lazyframe.filter().map(...).collect(streaming=False)
due to not having to materialize the result of the filter
operation in-memory
Not sure this is the right place to ask but are there any plans to support scan_
methods on byte streams?
It looks like scan_parquet
only supports a path at the moment but it seems it would be very handy to support at the byte stream level.
Not sure this is the right place to ask but are there any plans to support scan_ methods on byte streams?
I am quite sure that there is already a feature request for that.
I will close this thread as the streaming API supports OOC processing.
I saw this issue was closed as completed. For those of us curious to find the PRs that close this issue, I've pulled a few. I'm sure there are more!
- https://github.com/pola-rs/polars/pull/8573
- https://github.com/pola-rs/polars/pull/7630
- https://github.com/pola-rs/polars/pull/7604
- https://github.com/pola-rs/polars/pull/7244