dbx icon indicating copy to clipboard operation
dbx copied to clipboard

ModuleNotFoundError / SerializationError using pandas-udf

Open kbafna-antuit opened this issue 2 years ago • 6 comments
trafficstars

Run pandas-udf succesfully

ModuleNotFoundError / SerializationError

Steps to Reproduce (for bugs)

Place the python file containing the function to be used at a different level than the file which is run and execute via "dbx execute"

Context

I have a use case where in a function is imported from a "helper" which is not located in the same place as the file which is run "runner". Running this by creating a job from the UI works. However, whenever we execute it via dbx, we get the SerializationErrorerror saying Module not Found. It looks like the executors are not able to access the module. I tried adding the file to the spark context but that doesn't work either.

Below is the example structure:

project/
|- __init__.py
|- tools/
   |- __init__.py
   |- helper.py
|- runners/
   |- runner.py

runner.py

import sys
user_id = spark.sql('select current_user()').first()[0]
repo_path = "/Workspace/Repos/" + user_id + "/project/"
print("Repo_path:", repo_path)
sys.path.append(os.path.abspath(repo_path))
from tools.helper import subtract_mean, test_connection
print("Test Connection: ", test_connection())

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v")
)
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()

helper.py

import pandas as pd
def did_it_work():
    return print("connection is working")

def subtract_mean(pdf):
        # pdf is a pandas.DataFrame
        v = pdf.v
        return pdf.assign(v=v - v.mean())

stderr

pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 466, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'tools'

stdout

/Workspace/Repos/[email protected]/project/
Test Connection: connection is working
ModuleNotFoundError: No module named 'tools'

Your Environment

python 3.10

  • dbx version used: 0.17.4
  • Databricks Runtime version: 10.4 LTS

kbafna-antuit avatar Jan 23 '23 13:01 kbafna-antuit

hi @kbafna-antuit , your import is incorrect - it should be a package-based one:

from tools.helper import subtract_mean, test_connection from project.tools.helper import subtract_mean, test_connection

Please read more on import statements here. This error is irrelevant to dbx.

renardeinside avatar Jan 23 '23 14:01 renardeinside

@renardeinside I tried to re-create a simpler version of the issue and might have added a typo. Sorry for that. However, as you can see in my logs, my import is successful and the connection is working ( did_it_work ). I only face the error in the line when i use the "applyInPandas" and pass the function to it.

kbafna-antuit avatar Jan 23 '23 15:01 kbafna-antuit

okey, I see. I'll take a look at it, seems like UDFs are not being properly picked up on the worker nodes.

renardeinside avatar Jan 23 '23 15:01 renardeinside

Sure. I also found some existing discussions on the community regarding similar issue, if that helps. https://community.databricks.com/s/question/0D53f00001M7cYMCAZ/modulenotfounderror-serializationerror-when-executing-over-databricksconnect

kbafna-antuit avatar Jan 23 '23 15:01 kbafna-antuit

Some updates into this.

In a pure dbx-execute scenario this approach works as expected:

# file: some_pkg/tasks/utils.py

import pandas as pd 

# this will be a UDF
def subtract_mean(pdf: pd.DataFrame):
        v = pdf.v
        return pdf.assign(v=v - v.mean())
# file: some_pkg.tasks.sample_task

from some_pkg.common import Task
from some_pkg.tasks.utils import subtract_mean


class SampleMLTask(Task):

    def launch(self):
            df = self.spark.createDataFrame(
                [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
                ("id", "v")
            )
            df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()

Command:

dbx execute .... --cluster-name=...

Result:

[dbx][2023-02-01 12:49:16.464] 🔊 stdout from the execution is shown below:
+---+----+
| id|   v|
+---+----+
|  1|-0.5|
|  1| 0.5|
|  2|-3.0|
|  2|-1.0|
|  2| 4.0|
+---+----+
[dbx][2023-02-01 12:49:16.467] Command execution finished

renardeinside avatar Feb 01 '23 11:02 renardeinside

Not sure if this error would be resolved using the same approach, but basically we would need to ensure the folder that we are picking to import from should have only .py files and not any other notebooks, .sh , .yaml etc. It is not able to serialize files that arent .py

rsukumar-antuit avatar Feb 17 '23 16:02 rsukumar-antuit