prefect icon indicating copy to clipboard operation
prefect copied to clipboard

import of module '__prefect_loader__' failed when 'multiprocessing.Pool.apply_async' is used within the flow via remote deployments

Open rsampaths16 opened this issue 1 year ago • 9 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

Invoked directly

If the flow is invoked directly via python python example.py then it runs successfully without any issues

Invoked via agent through a deployment

If a deployment is created and it is invoked by an agent then the issues occurs that __prefect_loader__ fails pickling of the function

Reproduction

from prefect import flow
from multiprocessing import Pool
import time


@flow
def foo(a):
    p = Pool(4)
    for i in range(a):
        p.apply_async(bar, (i+1,), callback=ok, error_callback=error)
    p.close()
    p.join()


def ok(x):
    print(f"I am ok {x}")


def error(x):
    print(f"I am error {x}")


def bar(b):
    print('hi')
    time.sleep(b)
    print('bye')


if __name__ == '__main__':
    foo(5)

Error

I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed
I am error Can't pickle <function bar at 0x7f5c39a97680>: import of module '__prefect_loader__' failed

Versions

###### prefect versions
2.8.5
2.10.5

###### python versions
3.7.9

Additional context

###
### A complete description of a Prefect Deployment for flow 'foo'
###
name: example
description: null
version: 6c8636a7bb4d72afb9b92c3025675f05
# The work queue that will handle this deployment's runs
work_queue_name: test
work_pool_name: default-agent-pool
tags: []
parameters: {}
schedule: null
is_schedule_active: true
infra_overrides: {}

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: foo
manifest_path: null
infrastructure:
  type: process
  env: {}
  labels: {}
  name: null
  command: null
  stream_output: true
  working_dir: null
  _block_document_id: 762ff9aa-b9e3-4e34-b047-7a26018db371
  _block_document_name: anonymous-617e9166-a2a4-48cc-94b5-4c36a7fe7276
  _is_anonymous: true
  block_type_slug: process
  _block_type_slug: process
storage: null
path: /home/ubuntu/test
entrypoint: example.py:foo
parameter_openapi_schema:
  title: Parameters
  type: object
  properties:
    a:
      title: a
      position: 0
  required:
  - a
  definitions: null
timestamp: '2023-04-20T07:01:21.085073+00:00'

rsampaths16 avatar Apr 26 '23 06:04 rsampaths16