ibis icon indicating copy to clipboard operation
ibis copied to clipboard

feat(api): Vectorized UDAFs

Open ogrisel opened this issue 3 years ago • 12 comments

duckdb does not support scalar User Defined Functions written in Python (to be applied one record at a time) but it does expose a vector Python UDF via the map method method:

>>> import pandas as pd
>>> import duckdb
>>> df = pd.DataFrame({"x": range(int(1e4))})
>>> def process_chunk(df_chunk):
...     print(f"processing chunk of size {df_chunk.shape[0]}")
...     return df_chunk * 2
... 
>>> duckdb.from_df(df).map(process_chunk).to_df()
processing chunk of size 0
processing chunk of size 0
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 1024
processing chunk of size 784
          x
0         0
1         2
2         4
3         6
4         8
...     ...
9995  19990
9996  19992
9997  19994
9998  19996
9999  19998

[10000 rows x 1 columns]

The main motivation for this vector Python UDF API is probably to hide the per-record Python function call overhead. I think it's a pragmatic API and it would allow to efficiently deploy trained machine learning models for batch scoring in out-of-core manner for instance.

Any chance to expose such vector Python UDFs via the Ibis API?

Also if some backends include or add support Python UDAFs (especially with in parallel via combiners in addtion to mappers and reducers), this would open the possibility to train machine learning models (e.g. with scikit-learn or Pytorch) directly via Ibis. As far as I know, duckdb does not expose parallel Python UDAFs unfortunately.

Final side-request: for backends who only support scalar UDFs, would it be possible for Ibis to generate the SQL required to do the chunking itself and expose a vector UDF API to hide the Python function call overhead similarly to what duckdb is doing internally with map?

ogrisel avatar Oct 23 '22 14:10 ogrisel

@ogrisel Thanks for the issue. This is definitely on our radar and we'll probably start experimenting with support for this in the next month. In fact, the DuckDB folks just pointed us to .map a few weeks ago.

Any chance to expose such vector Python UDFs via the Ibis API?

So, in short, yes there's a great chance of this happening :)

this would open the possibility to train machine learning models (e.g. with scikit-learn or Pytorch) directly via Ibis

This is an interesting path for us to go down; it's great to hear a concrete use case for UDAFs since getting them to work well with a nice API and solid performance will be challenging.

for backends who only support scalar UDFs, would it be possible for Ibis to generate the SQL required to do the chunking itself and expose a vector UDF API to hide the Python function call overhead similarly to what duckdb is doing internally with map?

Possibly! I think we'll need to do some prototyping before we can give a concrete yes or no to this.

Really appreciate all the issues you're opening, it's wonderful to get feedback from users ❤️

cpcloud avatar Oct 23 '22 16:10 cpcloud

this would open the possibility to train machine learning models (e.g. with scikit-learn or Pytorch) directly via Ibis

This is an interesting path for us to go down; it's great to hear a concrete use case for UDAFs since getting them to work well with a nice API and solid performance will be challenging.

Note that UDF (without A) support is enough to "apply" a trained model to a large dataset to score (compute the predictions / recommendations / price estimates / anomaly scores...) in an out-of-core manner and possibly distributed manner (if the backend is distributed) assuming the the UDF workers are long-lived and can reuse a cached model loaded by name from a shared filesystem or object store to process several chunks without re-paying the FS I/O and model loading overhead for each chunk.

This alone would be quite valuable I think. Off-course, distributed model training with UDAFs would also be interesting but it would require a lot more work (and internal changes in scikit-learn in particular).

ogrisel avatar Oct 23 '22 16:10 ogrisel

Also: related question, does the dask backend support (vector) UDAFs? It should be quite natural to implement however the documentation on UDFs is lacking.

ogrisel avatar Oct 23 '22 16:10 ogrisel

Reading the clickhouse docs, it seems that it would be natural to implement Python vector UDFs with clickhouse as records are batched by default:

https://clickhouse.com/docs/en/sql-reference/functions/#executable-user-defined-functions

Both clickhouse and pandas can efficiently exchange data via the Arrow format (chunked dataframes): https://clickhouse.com/docs/en/interfaces/formats/#data-format-arrow so implemented a duckdb map that presents the chunk data as a pandas dataframe should be quite easy to do for this backend.

ogrisel avatar Oct 23 '22 21:10 ogrisel

@cpcloud I drafted a proposal for Python UDAFs API in a duckdb issue if you are interested: https://github.com/duckdb/duckdb/discussions/5117.

ogrisel avatar Oct 28 '22 10:10 ogrisel

cc @icexelloss

jreback avatar Oct 28 '22 11:10 jreback

We now have vectorized scalar UDFs in the DuckDB backend. Check out the related blog post.

cpcloud avatar Jul 05 '23 11:07 cpcloud

We now have vectorized scalar UDFs in the DuckDB backend. Check out the related blog post.

Thanks, this is great. A few remarks:

  • I cannot find those in the reference documentation. I had to read the source code to list the various UDF alternatives (Python scalar values, Pandas Series and PyArrow Arrays);
  • While I could successfully write a test function that would process more than 1 input columns at a time, I do not see how it would be possible to use this to write a function that generates more than 1 return values and interpret them as new individual columns with user controllable names. If it's possible to do so with the existing API it would be great to showcase this in an example snippet, e.g. in the docstrings of the UDFs;
  • Is it possible to control the chunking? At the moment is completely implicit. Maybe some users would like to be able to be given some control on the memory-usage / per-Python call overhead tradeoff.

ogrisel avatar Aug 21 '23 14:08 ogrisel

I cannot find those in the reference documentation.

Yeah, this is something we need to add. I'll create an issue to track this work.

While I could successfully write a test function that would process more than 1 input columns at a time, I do not see how it would be possible to use this to write a function that generates more than 1 return values and interpret them as new individual columns with user controllable names.

That's a good question.

Here's an extremely contrived but hopefully illustrative example of how you might do this:

In [25]: from ibis.interactive import *

In [26]: import ibis.expr.datatypes as dt

In [27]: @udf.scalar.python
    ...: def make_struct(
    ...:     height: int, mass: float
    ...: ) -> dt.Struct(dict(height_cm="int", mass_kg="float")):
    ...:     return dict(height_cm=height, mass_kg=mass)

In [28]: t = ex.starwars.fetch(table_name="starwars")

In [29]: t.select(dims=make_struct(t.height, t.mass)).unpack("dims")
Out[29]:
┏━━━━━━━━━━━┳━━━━━━━━━┓
┃ height_cm ┃ mass_kg ┃
┡━━━━━━━━━━━╇━━━━━━━━━┩
│ int64     │ float64 │
├───────────┼─────────┤
│       172 │    77.0 │
│       167 │    75.0 │
│        96 │    32.0 │
│       202 │   136.0 │
│       150 │    49.0 │
│       178 │   120.0 │
│       165 │    75.0 │
│        97 │    32.0 │
│       183 │    84.0 │
│       182 │    77.0 │
│         … │       … │
└───────────┴─────────┘

In [30]: import pyarrow.compute as pc
    ...:
    ...:
    ...: @udf.scalar.pyarrow
    ...: def make_struct_arrow(
    ...:     height: int, mass: float
    ...: ) -> dt.Struct(dict(height_cm="int", mass_kg="float")):
    ...:     return pc.make_struct(height, mass, field_names=("height_cm", "mass_kg"))

In [31]: t.select(dims=make_struct_arrow(t.height, t.mass)).unpack("dims")
Out[31]:
┏━━━━━━━━━━━┳━━━━━━━━━┓
┃ height_cm ┃ mass_kg ┃
┡━━━━━━━━━━━╇━━━━━━━━━┩
│ int64     │ float64 │
├───────────┼─────────┤
│       172 │    77.0 │
│       167 │    75.0 │
│        96 │    32.0 │
│       202 │   136.0 │
│       150 │    49.0 │
│       178 │   120.0 │
│       165 │    75.0 │
│        97 │    32.0 │
│       183 │    84.0 │
│       182 │    77.0 │
│         … │       … │
└───────────┴─────────┘

The approach is basically to stuff everything into a struct and then unpack the columns into the containing table as needed.

The primary downside is that column names must be declared statically. You can't use the result of the UDF to compute the column names.

Is it possible to control the chunking?

AFAIK, no.

At what point do you want to control the chunking? When you call to_pandas(), or earlier, such as when you define the function?

What would your ideal API look like for controlling the chunking?

cpcloud avatar Aug 21 '23 14:08 cpcloud

At what point do you want to control the chunking? When you call to_pandas(), or earlier, such as when you define the function?

Not sure what you mean by "when you call to_pandas()" (when calling the function itself?). I think it should be fine to do it do it at function time.

What would your ideal API look like for controlling the chunking?

Maybe as an argument to the decorator, to either control the target number of records per chunk or the average size in Bytes for each each.

But honestly this might be a YAGNI for most people.

ogrisel avatar Aug 22 '23 07:08 ogrisel

The primary downside is that column names must be declared statically. You can't use the result of the UDF to compute the column names.

That's fine. Since the type declaration, the knowledge of pyarrow.compute.make_struct and the combination with the unpack method are not trivial, I think it would be helpful to demonstrate this more advanced case in an example doctest in the docstring.

ogrisel avatar Aug 22 '23 07:08 ogrisel

Vectorized (per-element) UDFs have been around for a while, retitling this to reflect that.

cpcloud avatar Jun 25 '24 15:06 cpcloud

Closing due to lack of activity.

cpcloud avatar Sep 11 '24 17:09 cpcloud

Hi @cpcloud , Hopefully I will get reponse for this. I am looking for processing on multiple groups of ibis table with one function. Can I perform parallelization in ibis in someway.

Scenario

Item Week Sales_Qty
item1 ... .....
item2 ... .....
item1 ... .....

I mean I have multiple items and I want to perform below function on this items in parallelization way. Can you suggest something @cpcloud .

def modify_zero_demand_as_missing_or_legitmate(
    timeseries: ibis_table,
) -> ibis_table:
    # compute mean and stdddev without zero and NaNs. (NaNs are by default not included in numpy mean and std method.)
    timeseries = timeseries.order_by("WEEK").mutate(
        INDEX=ibis.row_number().over(order_by="WEEK")
    )
    non_zero_mean = (
        timeseries.filter(timeseries.SALES_QTY != 0).SALES_QTY.mean().execute()
    )
    non_zero_std = (
        timeseries.filter(timeseries.SALES_QTY != 0).SALES_QTY.std().execute()
    )
    consider_zero_as_missing = 0 < (non_zero_mean - 3 * non_zero_std)
    if consider_zero_as_missing:
        first_sale_index = timeseries.filter(
            timeseries.SALES_QTY != 0
        ).INDEX.min()
        timeseries = timeseries.mutate(
            SALES_QTY=ibis.case()
            .when(
                (timeseries.INDEX >= first_sale_index)
                & (timeseries.SALES_QTY == 0),
                None,
            )
            .else_(timeseries.SALES_QTY)
            .end()
        )
    return timeseries

Thank you

Hg03 avatar May 25 '25 13:05 Hg03