dbx
dbx copied to clipboard
ModuleNotFoundError / SerializationError using pandas-udf
Run pandas-udf succesfully
ModuleNotFoundError / SerializationError
Steps to Reproduce (for bugs)
Place the python file containing the function to be used at a different level than the file which is run and execute via "dbx execute"
Context
I have a use case where in a function is imported from a "helper" which is not located in the same place as the file which is run "runner". Running this by creating a job from the UI works. However, whenever we execute it via dbx, we get the SerializationErrorerror saying Module not Found. It looks like the executors are not able to access the module. I tried adding the file to the spark context but that doesn't work either.
Below is the example structure:
project/
|- __init__.py
|- tools/
|- __init__.py
|- helper.py
|- runners/
|- runner.py
runner.py
import sys
user_id = spark.sql('select current_user()').first()[0]
repo_path = "/Workspace/Repos/" + user_id + "/project/"
print("Repo_path:", repo_path)
sys.path.append(os.path.abspath(repo_path))
from tools.helper import subtract_mean, test_connection
print("Test Connection: ", test_connection())
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v")
)
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
helper.py
import pandas as pd
def did_it_work():
return print("connection is working")
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
stderr
pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
return self.loads(obj)
File "/databricks/spark/python/pyspark/serializers.py", line 466, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'tools'
stdout
/Workspace/Repos/[email protected]/project/
Test Connection: connection is working
ModuleNotFoundError: No module named 'tools'
Your Environment
python 3.10
- dbx version used: 0.17.4
- Databricks Runtime version: 10.4 LTS
hi @kbafna-antuit , your import is incorrect - it should be a package-based one:
from tools.helper import subtract_mean, test_connection from project.tools.helper import subtract_mean, test_connection
Please read more on import statements here. This error is irrelevant to dbx.
@renardeinside I tried to re-create a simpler version of the issue and might have added a typo. Sorry for that. However, as you can see in my logs, my import is successful and the connection is working ( did_it_work ). I only face the error in the line when i use the "applyInPandas" and pass the function to it.
okey, I see. I'll take a look at it, seems like UDFs are not being properly picked up on the worker nodes.
Sure. I also found some existing discussions on the community regarding similar issue, if that helps. https://community.databricks.com/s/question/0D53f00001M7cYMCAZ/modulenotfounderror-serializationerror-when-executing-over-databricksconnect
Some updates into this.
In a pure dbx-execute scenario this approach works as expected:
# file: some_pkg/tasks/utils.py
import pandas as pd
# this will be a UDF
def subtract_mean(pdf: pd.DataFrame):
v = pdf.v
return pdf.assign(v=v - v.mean())
# file: some_pkg.tasks.sample_task
from some_pkg.common import Task
from some_pkg.tasks.utils import subtract_mean
class SampleMLTask(Task):
def launch(self):
df = self.spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v")
)
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
Command:
dbx execute .... --cluster-name=...
Result:
[dbx][2023-02-01 12:49:16.464] 🔊 stdout from the execution is shown below:
+---+----+
| id| v|
+---+----+
| 1|-0.5|
| 1| 0.5|
| 2|-3.0|
| 2|-1.0|
| 2| 4.0|
+---+----+
[dbx][2023-02-01 12:49:16.467] Command execution finished
Not sure if this error would be resolved using the same approach, but basically we would need to ensure the folder that we are picking to import from should have only .py files and not any other notebooks, .sh , .yaml etc. It is not able to serialize files that arent .py