ibis
ibis copied to clipboard
Running a pyarrow function on pyspark dataframe results in a non-vectorized execution
What happened?
Running a pyarrow function on pyspark dataframe results in a non-vectorized execution.
sample code:
from pyspark.sql import SparkSession
import ibis
import pyarrow.compute as pc
def main():
session = SparkSession.builder.getOrCreate()
con = ibis.pyspark.connect(session=session)
p = con.read_csv("./penguins.csv.gz")
con.create_view("penguins",
p,
# database=db,
overwrite=True
)
udf = ibis.udf.scalar.pyarrow(somma)
t = con.table("penguins")
print((t.filter(t.species == "Adelie")
.mutate(weird_col=udf(t.year, t.body_mass_g))
.select("year", "body_mass_g", "weird_col")).execute())
def somma(x: int, y: int) -> int:
# I can't add correct type hints (pyarrow.lib.Int64Array) to this function otherwise
# I get `dtype`: <class 'pyarrow.lib.Int64Array'> is not either None or coercible to a DataType` error
# the types are those though and work across spark and polars
return pc.add(x, y)
if __name__ == "__main__":
main()
If I patch compute.py to log the args of the call like the following:
def wrapper(*args, memory_pool=None):
print("****")
print(type(args[1]))
print(args[1])
print("****")
I get the following output:
****
<class 'float'>
3700.0
****
****
<class 'float'>
4000.0
****
while if I run the same with polars I get only 1 invocation:
****
<class 'pyarrow.lib.Int64Array'>
[
3750,
3800,
3250,
...
Is this expected?
What version of ibis are you using?
10.2.0
What backend(s) are you using, if any?
PySpark
Relevant log output
Code of Conduct
- [x] I agree to follow this project's Code of Conduct