prefect
prefect copied to clipboard
import of module '__prefect_loader__' failed when 'multiprocessing.Pool.apply_async' is used within the flow via remote deployments
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
Invoked directly
If the flow is invoked directly via python python example.py
then it runs successfully without any issues
Invoked via agent through a deployment
If a deployment is created and it is invoked by an agent then the issues occurs that __prefect_loader__
fails pickling of the function
Reproduction
from prefect import flow
from multiprocessing import Pool
import time
@flow
def foo(a):
p = Pool(4)
for i in range(a):
p.apply_async(bar, (i+1,), callback=ok, error_callback=error)
p.close()
p.join()
def ok(x):
print(f"I am ok {x}")
def error(x):
print(f"I am error {x}")
def bar(b):
print('hi')
time.sleep(b)
print('bye')
if __name__ == '__main__':
foo(5)
Error
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
Versions
###### prefect versions
2.8.5
2.10.5
###### python versions
3.7.9
Additional context
###
### A complete description of a Prefect Deployment for flow 'foo'
###
name: example
description: null
version: 6c8636a7bb4d72afb9b92c3025675f05
# The work queue that will handle this deployment's runs
work_queue_name: test
work_pool_name: default-agent-pool
tags: []
parameters: {}
schedule: null
is_schedule_active: true
infra_overrides: {}
###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: foo
manifest_path: null
infrastructure:
type: process
env: {}
labels: {}
name: null
command: null
stream_output: true
working_dir: null
_block_document_id: 762ff9aa-b9e3-4e34-b047-7a26018db371
_block_document_name: anonymous-617e9166-a2a4-48cc-94b5-4c36a7fe7276
_is_anonymous: true
block_type_slug: process
_block_type_slug: process
storage: null
path: /home/ubuntu/test
entrypoint: example.py:foo
parameter_openapi_schema:
title: Parameters
type: object
properties:
a:
title: a
position: 0
required:
- a
definitions: null
timestamp: '2023-04-20T07:01:21.085073+00:00'