polars icon indicating copy to clipboard operation
polars copied to clipboard

Support lazy schema retrieval in IO Plugins

Open tmct opened this issue 1 year ago • 7 comments

Description

Hi,

I just wrote my first Python IO Plugin, for fetching data from a company-internal data source - very pleased that you can create LazyFrames of completely custom origin now, thanks! This brings us one step closer to expressing entire data processing pipelines declaratively without breaks using the Lazy API.

What is missing from the example, and would really help me to put my scan_custom_data_source into production, is the ability to collect the schema lazily, as has been recently-ish made possible in LazyFrames.

Is there already a way to do this, or are changes needed to register_io_source?

Many thanks, Tom

tmct avatar Sep 09 '24 16:09 tmct

Let me know if this is worth another Issue, but I am also interested in the scenario where I am able to leverage existing built-in LazyFrame scans, but want to do some lazily-defined work beforehand.

Motivating example: I would like to write my own scan_encrypted_parquet method. I can of course write a register_io_source callable which decrypts files and yields DataFrames, but this means that any compute graph defined within that method is disconnected from later compute. And the onus is then on me to pass through the with_columns args etc appropriately. Imagine for example a similar function to register_io_source, whose callable argument returns LazyFrames.

tmct avatar Sep 10 '24 09:09 tmct

Here is my work in progress, illustrating both questions I have:

def scan_parquet_sc(
    f: Path,
    schema: SchemaDict,  # ideally we would collect this lazily (and maybe cache the result if collect_schema is called before collect.) But until then, don't get this schema wrong!
) -> pl.LazyFrame:
    """Decrypts a parquet file then scans.
    
    Don't ask why we're encrypting the whole file instead of using the Parquet columnar standard..."""

    def pq_source(
        with_columns: list[str] | None,
        predicate: pl.Expr | None,
        _n_rows: int | None,
        _batch_size: int | None,
    ) -> Iterator[pl.DataFrame]:
        res = _decrypt_and_scan_parquet_sc(f, with_columns, predicate, _n_rows, _batch_size)
        if _batch_size is not None:
            logger.warning(f"Was passed {_batch_size=} but will ignore for now - maybe we should collect row group by row group then check if that's big enough to batch")
        yield res.collect()  # if batch=None I would perhaps like a way to return a LazyFrame?

    return register_io_source(pq_source, schema=schema)


def _decrypt_and_scan_parquet_sc(f: Path,
       with_columns: list[str] | None,
       predicate: pl.Expr | None,
        n_rows: int | None,
       _batch_size: int | None,
   ) -> pl.LazyFrame:
    pq_bytes = BytesIO(crypt.decrypt_bytes(f.read_bytes()))
    df = pl.scan_parquet(pq_bytes)  # once #10413 is released...
    if with_columns is not None:
        df = df.select(with_columns)
    if predicate is not None:
        df = df.filter(predicate)
    if _n_rows is not None:
        df = df.head(_n_rows)  # I'm pretty sure you want head and not fetch here
    return df


def _collect_schema_parquet_sc(f: Path):
    # TODO implement and hook in when supported
    ...

tmct avatar Sep 10 '24 09:09 tmct

Hey @tmct, great that you made an IO plugin! That's where they are for.

This example shows the ability to create the schema lazily:

https://github.com/pola-rs/pyo3-polars/tree/main/example/io_plugin

ritchie46 avatar Sep 11 '24 11:09 ritchie46

Many thanks Ritchie, I'll give it a go.

tmct avatar Sep 11 '24 18:09 tmct

Does 'src.schema()' in that example not realise the schema at the time of scan_... though, rather than lazily? I do not understand the significance of that last instance of the Source, so likely I've misunderstood how it works

tmct avatar Sep 11 '24 18:09 tmct

I suppose another way of describing what I want is this:

Currently I can provide a custom Python function that can be used as the basis for a LazyFrame collect() - this is the IO plugins feature. To do so, I must currently provide a fixed schema up front. (And - I am pleased that this extensibility point exists!)

But - a general LazyFrame returned from e.g. scan_parquets followed by subsequent LazyFrame operations does not necessarily know its schema up front. (Indeed, I believe the case where this is non-trivial motivated the introduction of collect_schema()?)

So - I wish to implement such "fully lazy" LazyFrames using custom Python functions please, so that I can use the Polars Lazy API as a fully lazy description of my intended tabular processing. I imagine this could take the form of me passing a python callable returning the schema.

Do you think this would be possible? Desirable?

Thank you

tmct avatar Sep 20 '24 18:09 tmct

I've made an attempt at starting the public-facing side of this feature in #19232.

Any feedback or help would be much welcomed, thanks.

tmct avatar Oct 14 '24 21:10 tmct

Thanks to some help from @itamarst I now have a branch of my fork with this feature on it! It's not prod-ready at this point but I will have a play with it.

Example (passing) test from poc branch: https://github.com/tmct/polars/blob/8773d9c/py-polars/tests/unit/io/test_plugins.py#L39

tmct avatar Oct 30 '24 17:10 tmct

@tmct your function can register the callable every scan_io_function and on that registration you provided the schema.

In pseudo code

def scan_foo(file_name) -> pl.LazyFrame:
    def generator(

    ) -> Iterator[pl.DataFrame]:
        """
        Generator function that creates the source.
        This function will be registered as IO source.
        """

        new_size = size
        if n_rows is not None and n_rows < size:
            new_size = n_rows

        src = MySource()
        if with_columns is not None:
            src.set_with_columns(with_columns)

        # Set the predicate.
        predicate_set = True
        if predicate is not None:
            try:
                src.try_set_predicate(predicate)
            except pl.exceptions.ComputeError:
                predicate_set = False

        while (out := src.next()) is not None:
            # If the source could not apply the predicate
            # (because it wasn't able to deserialize it), we do it here.
            if not predicate_set and predicate is not None:
                out = out.filter(predicate)

            yield out

    # create src again to compute the schema
    src = MySource(samplers, 0, 0)
    return register_io_source(callable=source_generator, schema=src.schema())

ritchie46 avatar Jan 29 '25 11:01 ritchie46

Hi @ritchie46, your example doesn't solve this issue. What we want is to not have to know the schema until collect, collect_schema or explain etc are called. In your example, you still need to do some IO to determine the schema when scan_foo is called (where you call src.schema()).

The built-in scan_parquet etc methods don't have this behaviour, scan_parquet doesn't immediately read the Parquet file when it is called, this is deferred until the schema is needed when creating an execution plan.

adamreeve avatar Feb 02 '25 23:02 adamreeve

Right, I agree. It differs from the other scans. It is something that would be problematic for cloud. Is it problematic for your use cases though?

Maybe the schema should accept a callable that returns the schema.

ritchie46 avatar Feb 03 '25 09:02 ritchie46

Is it problematic for your use cases though?

I'm not 100% sure of the details, I'll let @tmct answer that.

Maybe the schema should accept a callable that returns the schema.

Yes, this is the approach being taken in #19232 (see https://github.com/pola-rs/polars/pull/19232/files#diff-5cab423b4448a3a63943ca166f0129ffd7425c114acc8f05835e76b5f9046de3R43)

adamreeve avatar Feb 03 '25 09:02 adamreeve

Will make it a callable. 👍

ritchie46 avatar Feb 03 '25 09:02 ritchie46

Yes, the lack of this has been problematic - for examples we have some abstractions that wrap lazyframes, and I think these can and should become pure LazyFrames, accompanied by domain-specific extensions and IO sources. We have in-house Python libraries that retrieve data of varying schema, and it is going to be hugely faster to get these working as IO Plugins (or at least prove the concept...) than full Rust rewrites.

Passing a callable sounds great to me - thank you very much.

tmct avatar Feb 03 '25 15:02 tmct

That should still work. All scans had the schema in the DSL a year ago and that worked fine for local execution. I am adding the callable option, but I do think it should be workable as is already.

ritchie46 avatar Feb 03 '25 17:02 ritchie46

Thanks for adding this! (#21079)

adamreeve avatar Feb 10 '25 23:02 adamreeve