polars icon indicating copy to clipboard operation
polars copied to clipboard

Out of Core Processing

Open zareami10 opened this issue 2 years ago • 12 comments

This might be a complex issue to tackle, but are there any plans for out of core processing? These days it is fairly common to deal with huge larger-than-memory datasets, so it would be very nice to have a faster alternative to the likes of Dask.

zareami10 avatar May 13 '22 14:05 zareami10

This is what Spark is for. Scope and feature creep is a sure way to throw a project off the track. Might be for polars-2.0 😄

slonik-az avatar May 13 '22 18:05 slonik-az

See #3379. Polars itself will not do OOC processing.

We might produce distribution plans that could be executed by polars on ray or dask.

ritchie46 avatar May 13 '22 18:05 ritchie46

It may be useful to contribute a from_polars function in ray

unidesigner avatar Jul 20 '22 13:07 unidesigner

It may be useful to contribute a from_polars function in ray

The link is broken for me. Sharing the link that works form me

nitesh-jaswal avatar Sep 14 '22 01:09 nitesh-jaswal

I stumbled on Polars when looking for an alternative for dask.dataframe. Although, according to this thread, it's not quite what I was looking for I'm massively impressed by the performance and the clean API. Because of the latter alone I wish I could ditch all my pandas/dask code and never look back :)

I'd like to share my thoughts on slonik's comment that "This is what Spark is for." I would say there is still a gap in the current python datascience stack when people want to process data on a single node when the data is too large for memory and/or they want to utilise all cores on their machine. In this case people don't necessarily want to go to the trouble of setting up and managing a Spark cluster. (At my company we're in exactly this situation on several projects.)

polars actually solves 50% of the problem since it is much better at utilising all available cores. So, for "local" operations (where you only need one partition at a time) using polars with a simple for loop over the partitions actually does the trick. In my experience the operations which are feasible on a single node are either local or 'almost local'. With the latter I mean things like a join between two datasets on a column that both datasets are partitioned by. If the boundaries of the partitions are known you then don't need to iterate over all possible pairs of partitions.

So, I'm wondering if it would be enough to have a small set of helper functions/classes (outside of polars.DataFrame, maybe outside of polars entirely) that make it easier to deal with partitioned data (keeping track of bounds, doing filtering or joins) would be enough to close the gap.

Apologies for the rambling post, but I'd be interested in hearing your opinions.

mwiebusch78 avatar Dec 07 '22 15:12 mwiebusch78

@mwiebusch78 this thread has not aged well as polars currently partially supports out of core operations and will try to expand this functionality to more operations.

To use polars on larger than ram datasets try collect(streaming=True). There are quite some operations we can do ooc already.

ritchie46 avatar Dec 07 '22 16:12 ritchie46

Ah, that's great to know. How is collect(streaming=True) supposed to work? Do you get an iterator over frames instead of a single frame? I've tried this:

import polars as pl 
data = pl.scan_parquet('data/*.parquet')                                                           
print(scores.collect(streaming=True))

but I'm getting this weird error:

thread '<unnamed>' panicked at 'called `Option::unwrap()` on a `None` value', /Users/runner/work/polars/polars/polars/polars-utils/src/arena.rs:80:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
  File "/Users/wiebuschm/Projects/ml-fraud/notebooks/scratch2.py", line 4, in <module>
    print(data.collect(streaming=True))
  File "/Users/wiebuschm/.pyenv/versions/general/lib/python3.9/site-packages/polars/utils.py", line 310, in wrapper
    return fn(*args, **kwargs)
  File "/Users/wiebuschm/.pyenv/versions/general/lib/python3.9/site-packages/polars/internals/lazyframe/frame.py", line 1164, in collect
    return pli.wrap_df(ldf.collect())
pyo3_runtime.PanicException: called `Option::unwrap()` on a `None` value

That's on polars 0.15.2

martin-wiebusch-thg avatar Dec 07 '22 18:12 martin-wiebusch-thg

I'm also curious what your plans are for the kinds of joins that I described. (Where both datasets are partitioned by one or more of the columns that you join on.) Are you currently doing any optimisations for those situations? If not, are you planning to do so?

martin-wiebusch-thg avatar Dec 07 '22 18:12 martin-wiebusch-thg

@martin-wiebusch-thg I have similar use cases as you described, and found DuckDB to be an interesting option. It can to join across partitioned parquet datasets, use all the cores, and I believe does out-of-core processing.

unidesigner avatar Dec 07 '22 19:12 unidesigner

Ah, that's great to know. How is collect(streaming=True) supposed to work? Do you get an iterator over frames instead of a single frame? I've tried this:

import polars as pl 
data = pl.scan_parquet('data/*.parquet')                                                           
print(scores.collect(streaming=True))

but I'm getting this weird error:

thread '<unnamed>' panicked at 'called `Option::unwrap()` on a `None` value', /Users/runner/work/polars/polars/polars/polars-utils/src/arena.rs:80:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
  File "/Users/wiebuschm/Projects/ml-fraud/notebooks/scratch2.py", line 4, in <module>
    print(data.collect(streaming=True))
  File "/Users/wiebuschm/.pyenv/versions/general/lib/python3.9/site-packages/polars/utils.py", line 310, in wrapper
    return fn(*args, **kwargs)
  File "/Users/wiebuschm/.pyenv/versions/general/lib/python3.9/site-packages/polars/internals/lazyframe/frame.py", line 1164, in collect
    return pli.wrap_df(ldf.collect())
pyo3_runtime.PanicException: called `Option::unwrap()` on a `None` value

That's on polars 0.15.2

This is a bug. But typically you want to reduce your collect size by a limit, group by or filter as the result may not fit in memory.

@unidesigner polars also can do these operations on all cores and out of core

ritchie46 avatar Dec 07 '22 19:12 ritchie46

Sorry if I'm being a bit slow here, but what's the difference between .collect(streaming=True) and .collect(streaming=False) if all the data gets pulled into memory in both cases?

And how would you handle situations where the result is too big for memory but you just want to write the result to disk one partition at a time? LazyDataFrame doesn't seem to have any write methods.

martin-wiebusch-thg avatar Dec 08 '22 09:12 martin-wiebusch-thg

Sorry if I'm being a bit slow here, but what's the difference between .collect(streaming=True) and .collect(streaming=False) if all the data gets pulled into memory in both cases?

Yeah :/ I don't understand streaming in the collect method either. Surely if you specify streaming, collect must return a generator/iterator?

%memit pq = pl.scan_parquet("./polars.parquet").collect(streaming=True)
peak memory: 2391.88 MiB, increment: 2302.32 MiB

^ It's just loaded the whole 2GiB thing into RAM. Shouldn't we be specifying a chunk size or something in which to stream out? I dunno, "Run parts of the query in a streaming fashion" - this doesn't illuminate a whole lot to me. Any further info would be appreciated!

StuartHadfield avatar Dec 30 '22 13:12 StuartHadfield

FYI, I have created a small helper package in Python which uses polars to handle partitioned data. It handles the (re)partitioning of the data and lets you slice and join datasets on the columns by which the data is partitioned (without loading the full data into memory, of course). I'm trying to keep it light-weight, so there is no attempt to re-implement the polars API (in the way dask.dataframe does it for pandas). Data-local operations (i.e. operations that only need one partition at a time) are supported by letting the user map an arbitrary function over the partitions (like Dask's map_partitions). The package is on PyPI (https://pypi.org/project/padawan/) and there's (basic) documentation on https://padawan.readthedocs.io/en/latest/.

mwiebusch78 avatar Feb 14 '23 14:02 mwiebusch78

^ It's just loaded the whole 2GiB thing into RAM. Shouldn't we be specifying a chunk size or something in which to stream out? I dunno, "Run parts of the query in a streaming fashion" - this doesn't illuminate a whole lot to me. Any further info would be appreciated!

You collect to memory. Use sink_parquet to stream to disk.

ritchie46 avatar Feb 21 '23 07:02 ritchie46

Sorry if I'm being a bit slow here, but what's the difference between .collect(streaming=True) and .collect(streaming=False) if all the data gets pulled into memory in both cases?

Disclaimer: I'm not really familiar with the internal design with pola-rs, so the answer might not be correct.

In both API, the final result is collected in-memory, however the difference is in the intermediate result during query execution. If your intermediate result is larger than the final result, then you'll see a difference in streaming execution.

A simple example would be:

lazyframe.filter(...).map(...).collect(streaming=True)

should use less memory than

lazyframe.filter().map(...).collect(streaming=False)

due to not having to materialize the result of the filter operation in-memory

kien-truong avatar Jun 27 '23 02:06 kien-truong

Not sure this is the right place to ask but are there any plans to support scan_ methods on byte streams?

It looks like scan_parquet only supports a path at the moment but it seems it would be very handy to support at the byte stream level.

oxlade39 avatar Jul 13 '23 21:07 oxlade39

Not sure this is the right place to ask but are there any plans to support scan_ methods on byte streams?

I am quite sure that there is already a feature request for that.

I will close this thread as the streaming API supports OOC processing.

ritchie46 avatar Jul 14 '23 07:07 ritchie46

I saw this issue was closed as completed. For those of us curious to find the PRs that close this issue, I've pulled a few. I'm sure there are more!

  • https://github.com/pola-rs/polars/pull/8573
  • https://github.com/pola-rs/polars/pull/7630
  • https://github.com/pola-rs/polars/pull/7604
  • https://github.com/pola-rs/polars/pull/7244

hammer avatar Aug 10 '23 13:08 hammer