No module named '__prefect_loader__'
Description
The flow run errors with:
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 216, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_deployment(deployment, client=client)
File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 107, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/deployments.py", line 276, in load_flow_from_deployment
flow = await maybe_flow.unpackage()
File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 107, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/packaging/orion.py", line 24, in unpackage
return self.serializer.loads(serialized_flow.encode())
File "/usr/local/lib/python3.9/site-packages/prefect/packaging/serializers.py", line 190, in loads
return from_qualified_name(blob.decode())
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/importtools.py", line 58, in from_qualified_name
module = importlib.import_module(mod_name)
File "/usr/local/lib/python3.9/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1030, in _gcd_import
File "<frozen importlib._bootstrap>", line 1007, in _find_and_load
File "<frozen importlib._bootstrap>", line 984, in _find_and_load_unlocked
ModuleNotFoundError: No module named '__prefect_loader__'
Reproduction / Example
flows/kubes_flow.py:
from prefect import flow, get_run_logger
from prefect.deployments import Deployment
from prefect.flow_runners import KubernetesFlowRunner
from prefect.packaging.orion import OrionPackager
from prefect.packaging.serializers import ImportSerializer
@flow
def kubes_flow() -> None:
# shown in kubectl logs but not prefect ui
print("Hello from Kubernetes!")
# show in prefect ui
logger = get_run_logger()
logger.info("Hello Prefect UI from Kubernetes!")
# use the default OrionPackager to store the flow's import path only
# since the flow is already stored inside the docker image
Deployment(
name="kubes-deployment-orion-packager-import",
flow=kubes_flow,
flow_runner=KubernetesFlowRunner(
image="orion-registry:5000/flow:latest",
),
packager=OrionPackager(serializer=ImportSerializer()),
)
Here's the block for my flow:
curl -s "http://localhost:4200/api/block_documents/a123815c-7a51-4446-8d56-10dc9fd14047" | jq .
{
"id": "a123815c-7a51-4446-8d56-10dc9fd14047",
"created": "2022-07-09T11:19:50.140724+00:00",
"updated": "2022-07-09T11:19:50.140803+00:00",
"name": "anonymous:48388c349c907ec7959e911b9d42eebd",
"data": {
"value": {
"flow": "__prefect_loader__.kubes_flow"
}
},
"block_schema_id": "02afbc00-fc1e-4dd5-8d42-57b165376620",
"block_schema": {
"id": "02afbc00-fc1e-4dd5-8d42-57b165376620",
"created": "2022-07-09T06:38:11.900676+00:00",
"updated": "2022-07-09T06:41:41.126000+00:00",
"checksum": "sha256:767ab2520040f319ca8d60e137cf23f9698fe51deb30b2b2f5848d0944a336d7",
"fields": {
"title": "JSON",
"description": "A block that represents JSON",
"type": "object",
"properties": {
"value": {
"title": "Value",
"description": "A JSON-compatible value"
}
},
"required": [
"value"
],
"block_type_name": "JSON",
"secret_fields": [],
"block_schema_references": {}
},
"block_type_id": "4dfbd6a2-ba1b-4b44-bfb3-c2732f9fe5dd",
"block_type": {
"id": "4dfbd6a2-ba1b-4b44-bfb3-c2732f9fe5dd",
"created": "2022-07-09T06:38:11.722069+00:00",
"updated": "2022-07-09T06:38:11.722297+00:00",
"name": "JSON",
"logo_url": null,
"documentation_url": null,
"description": null,
"code_example": null,
"is_protected": false
},
"capabilities": []
},
"block_type_id": "4dfbd6a2-ba1b-4b44-bfb3-c2732f9fe5dd",
"block_type": {
"id": "4dfbd6a2-ba1b-4b44-bfb3-c2732f9fe5dd",
"created": "2022-07-09T06:38:11.722069+00:00",
"updated": "2022-07-09T06:38:11.722297+00:00",
"name": "JSON",
"logo_url": null,
"documentation_url": null,
"description": null,
"code_example": null,
"is_protected": false
},
"block_document_references": {},
"is_anonymous": true
}
It looks like __prefect_loader__.kubes_flow is being stored as the module name rather than flows.kubes_flow
prefect 2.0b8
When the flow script is run directly, we lose the ability to infer the module name since it's replaced with __main__ if you call python <your_script.py> or __prefect_loader__ if we run the script for you. If you move your deployment into a separate file and import the flow, we will be able to use that as the flow's import path:
from my_flows import kubes_flow
Deployment(
flow=kubes_flow,
)
I'll try to find a way to determine the module path when they're in the same file.
Also note, if your module is a relative import rather than an installed module, this will fail while using the CLI. I'll investigate a fix for that, but you can call Deployment(...).create() directly and run the script with Python to get past it.
Thanks @madkinsz I'll try that. FYI I'm creating the deployment via the CLI, eg: prefect deployment create flows/kubes_flow.py.
Yep splitting out the deployment into its own file works! 🥳
I get a similar error when using DaskTaskRunner and invoking a function from within the same file as the task and flow.
Error
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2157, in execute
function, args, kwargs = await self._maybe_deserialize_task(ts)
File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2127, in _maybe_deserialize_task
function, args, kwargs = _deserialize(*ts.run_spec)
File "/usr/local/lib/python3.9/site-packages/distributed/worker.py", line 2755, in _deserialize
kwargs = pickle.loads(kwargs)
File "/usr/local/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 73, in loads
return pickle.loads(x)
ModuleNotFoundError: No module named '__prefect_loader__'
Minimal reproducible example
I have the following 2 files.
flow.py:
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
def inner():
return 0
@task
def bar():
inner()
@flow(task_runner=DaskTaskRunner())
def foo():
future = bar()
future.wait()
spec.py:
from prefect.deployments import Deployment
from flow import foo
spec = Deployment(
flow=foo,
tags=["test"],
)
Related to #6629
See also https://github.com/PrefectHQ/prefect/issues/6762
FWIW I ran into a similar error when attempting to run a computation on a Dask cluster from inside a running Prefect flow. Here's a description and change to distributed to avoid the particular issue I was running into https://github.com/dask/distributed/pull/8502. Just posting here for visibility.