ibis
ibis copied to clipboard
feat(pyspark): support udaf
Is your feature request related to a problem?
No response
What is the motivation behind your request?
I was trying to apply a user-defined aggregate function to a groupped table. but only builtin supported in @ibis.udf.agg.
And i use the deprecated annotaion reduction, magically it works!
import ibis
from pyspark.sql import SparkSession
from ibis.legacy.udf.vectorized import reduction
@reduction(output_type=ibis.dtype("float"), input_type=[ibis.dtype("int32")])
def avg(x) -> float:
return x.mean()
ibis.options.interactive = True
ibis.options.verbose = True
spark = SparkSession.builder \
.getOrCreate()
connection = ibis.pyspark.connect(spark)
df = connection.create_view('source', ibis.memtable(dict(id1=[1, 2, 3, 1, 2, 1], id2=[4, 5, 6, 2, 3, 4])))
df = df.group_by(df.id1).aggregate(avg_id2=avg(df.id2))
print(df)
SELECT `t0`.`id1`, IBIS_UDF_AVG_12861BCE(`t0`.`id2`) AS `avg_id2` FROM `source` AS `t0` GROUP BY 1 LIMIT 11
┏━━━━━━━┳━━━━━━━━━━┓
┃ id1 ┃ avg_id2 ┃
┡━━━━━━━╇━━━━━━━━━━┩
│ int64 │ float64 │
├───────┼──────────┤
│ 1 │ 3.333333 │
│ 2 │ 4.000000 │
│ 3 │ 6.000000 │
└───────┴──────────┘
Describe the solution you'd like
So is there any plan to migrate this annotation to new @ibis.udf.agg ?
What version of ibis are you running?
main
What backend(s) are you using, if any?
pyspark
Code of Conduct
- [X] I agree to follow this project's Code of Conduct