polars icon indicating copy to clipboard operation
polars copied to clipboard

Implementing iter_rows() on a LazyFrame

Open svaningelgem opened this issue 1 year ago • 5 comments

Problem description

This way you could potentially completely stream a dataframe. Even when it's larger than life (/memory) itself.

svaningelgem avatar Aug 23 '23 10:08 svaningelgem

iter_slices() is also supported only by DataFrame and not by LazyFrame - so it will be nice to have this function supported as well.

yonil7 avatar Nov 03 '23 20:11 yonil7

This is a great enhancement, and really wouldn't just be adding syntactic sugar over LazyFrame.slice?: https://docs.pola.rs/py-polars/html/reference/lazyframe/api/polars.LazyFrame.slice.html

For instance, to do this currently I think it just involves the following:

step = 3

lf = (
    pl.from_dicts([{"a": 1}, {"a": 2}, {"a": 3}, {"a": 4}, {"a": 5}])
    .lazy()
    .with_row_count()
)

count = lf.last().collect().item(0, "row_nr")

for offset in range(0, count + 1, step):
    micro_batch = lf.slice(offset, step).collect()
    print(micro_batch)

evbo avatar Dec 31 '23 17:12 evbo

@evbo Why

range(0, count + 1, step)

instead of

range(0, count, step)

?

fdcastel avatar Feb 20 '24 20:02 fdcastel

My attempt:

# Clone of DataFrame with LazyFrame support
def iter_slices(df: DataFrame | LazyFrame, n_rows: int = 10_000) -> Iterator[DataFrame | LazyFrame]:
    if isinstance(df, DataFrame):
        for offset in range(0, df.height, n_rows):
            yield df.slice(offset, n_rows)
    else:
        # Get row count for LazyFrame -- https://stackoverflow.com/a/75523731/33244
        row_count = df.select(pl.len()).collect().item()

        for offset in range(0, row_count, n_rows):
            yield df.slice(offset, n_rows).collect()

fdcastel avatar Feb 20 '24 21:02 fdcastel

@fdcastel with_row_count (which I think now is deprecated) generates indices starting from 0 - so it's off-by-one

I'm unsure of performance implications of using len vs last. Do you have any insights?

evbo avatar Feb 21 '24 00:02 evbo

It currently does not work as LazyFrame.slice() only streams with offset=0. Anything else seems to load all the data in memory anyway. But I suppose if that is fixed, then this pure-python code should work without knowing the length upfront, as LazyFrame.slice() does not seem to have a problem being asked a bigger slice than what's left:

def iter_slices(df, batch_size):
    def get_batch(df, offset, batch_size):
        batch = df.slice(offset, batch_size)
        batch = batch.collect(streaming=True)
        return batch

    batch = get_batch(df, 0, batch_size)
    # Yield once even if we got passed an empty LazyFrame
    yield batch
    offset = len(batch)
    if offset:
        while True:
            batch = get_batch(df, offset, batch_size)
            len_ = len(batch)
            if len_:
                offset += len_
                yield batch
            else:
                break


for batch in iter_slices(df, 100):
    print(batch)

EDIT: added streaming=True to collect() call and renamed iter_batches() to iter_slices()

douglas-raillard-arm avatar Jun 14 '24 11:06 douglas-raillard-arm

So it looks like the snippet above is working (and has been working for a few releases at least. I initially forgot the streaming=True part. With it, it does run in constant memory.

EDIT: it however does not run in O(n) time. As the iterator progresses, it gets slower and slower, probably from having to scan from the beginning every time.

douglas-raillard-arm avatar Jul 13 '24 23:07 douglas-raillard-arm

@douglas-raillard-arm I did notice a streaming slice was just added to the new streaming engine https://github.com/pola-rs/polars/pull/17451

Trying .collect(new_streaming=True) on 1.1.0 runs your example 10x faster in a simple test.

cmdlineluser avatar Jul 14 '24 01:07 cmdlineluser

@cmdlineluser That's interesting. I gave it a go but the reason it works faster is because everything is loaded upfront (or a lot). It almost instantly used all the memory on my machine.

So either way, it's not really usable yet considered the O(N^2) time complexity at best or eager loading at worst. I can't really think on how we could avoid having that O(N^2) issue without implementing the feature at a lower level. The Python version always has to "restart from the beginning" on the original LazyFrame and figure out where in the file is the slice data. The only way to avoid that would be to either:

  1. preserve the low-level state of the parser and resume from where it was left after the previous chunk (impossible from Python API)
  2. pre-compute some sort of array of file offsets that allow jumping straight to the data. This is also impossible to get from polars and feed back in AFAIK.

douglas-raillard-arm avatar Jul 15 '24 09:07 douglas-raillard-arm