ibis icon indicating copy to clipboard operation
ibis copied to clipboard

bug: UDFs not present on pyspark workers

Open NickCrews opened this issue 1 year ago • 0 comments

What happened?

Discovered in https://github.com/NickCrews/mismo/issues/64. CC @jstammers. Here is a more minimal reproducer.

Run with uv run script.py to get uv to install the deps automatically, or install them manually and then run python script.py.

# /// script
# requires-python = ">=3.11"
# dependencies = [
#     "ibis-framework[duckdb]",
#     "pyspark",
# ]
# ///
from __future__ import annotations

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import ibis

spark = SparkSession.builder.master("local").appName("My App").getOrCreate()


@ibis.udf.scalar.python
def ibis_udf_global(x: int) -> int:
    return x + 1


@F.pandas_udf(returnType="int")
def spark_udf(inp: pd.Series) -> pd.Series:
    @ibis.udf.scalar.python
    def ibis_udf_local(x: int) -> int:
        return x + 1

    t = ibis.memtable({"inp": inp})
    t = t.mutate(out=ibis_udf_global(t.inp))
    return t.out.to_pandas()


df = spark.createDataFrame(pd.DataFrame({"inp": (1, 2, 3)}))
print(df.withColumn("prediction", spark_udf(F.col("inp"))).toPandas())

This gives the below AttributeError. If I swap out the call to ibis_udf_global with the call to ibis_udf_local, then this script works.

Traceback
  File "/Users/nc/code/ibis/.scratch/bug_udf_pyspark.py", line 36, in <module>
    print(df.withColumn("prediction", spark_udf(F.col("inp"))).toPandas())
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/pyspark/sql/pandas/conversion.py", line 202, in toPandas
    rows = self.collect()
           ^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/pyspark/sql/dataframe.py", line 1263, in collect
    sock_info = self._jdf.collectToPython()
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 185, in deco
    raise converted from None
pyspark.errors.exceptions.captured.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/nc/code/ibis/.scratch/bug_udf_pyspark.py", line 31, in spark_udf
    t = t.mutate(out=ibis_udf_global(t.inp))
                     ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/ibis/common/deferred.py", line 613, in inner
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/.cache/uv/archive-v0/jW0IZYuX57G6RMPQ9lhv3/lib/python3.11/site-packages/ibis/expr/operations/udf.py", line 165, in construct
    return node(*args, **kwargs).to_expr()
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/code/ibis/ibis/common/bases.py", line 72, in __call__
    return cls.__create__(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/code/ibis/ibis/common/grounds.py", line 120, in __create__
    return super().__create__(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nc/code/ibis/ibis/common/grounds.py", line 199, in __init__
    object.__setattr__(self, name, value)
AttributeError: 'ibis_udf_global_0' object has no attribute 'x'

What version of ibis are you using?

main

What backend(s) are you using, if any?

pyspark

Relevant log output

No response

Code of Conduct

  • [X] I agree to follow this project's Code of Conduct

NickCrews avatar Oct 05 '24 15:10 NickCrews