Execute parallel tasks with result_storage
Bug summary
When I specify result_storage for a flow, that executes tasks in parallel I get an error
from prefect import flow, task, get_run_logger
from prefect.futures import wait
from prefect_dask import DaskTaskRunner
@task()
def task(param):
get_run_logger().info(f"task {param}")
@flow(task_runner=DaskTaskRunner(), result_storage='s3-bucket/minio-results')
def test_flow():
get_run_logger().info('test_flow')
futures = []
for i in range(1):
futures.append(task.submit(i))
wait(futures)
if __name__ == "__main__":
test_flow()
The error is:
20:36:10.969 | WARNING | distributed.worker - Compute Failed
Key: task-3d1f2a69c05629d3d7bde9aaf6d1c46d
Function: task
args: ()
kwargs: {'task': <prefect.tasks.Task object at 0x10c7662f0>, 'task_run_id': UUID('7c1cec53-f50f-47f2-a8e1-3cc07d04a4f5'), 'context': {'flow_run_context': {'flow': <prefect.flows.Flow object at 0x10c7667a0>, 'flow_run': {'id': UUID('72c623f2-2bbc-4b7f-9640-d75501e623ad'), 'created': DateTime(2024, 10, 23, 17, 36, 8, 974120, tzinfo=Timezone('UTC')), 'updated': DateTime(2024, 10, 23, 17, 36, 9, 29944, tzinfo=Timezone('UTC')), 'name': 'eccentric-lyrebird', 'flow_id': UUID('eb50450b-8733-4a6b-84b6-8e1310946649'), 'state_id': UUID('758cf05c-9f66-47a0-b036-ba1126a176d7'), 'deployment_id': None, 'deployment_version': None, 'work_queue_name': None, 'flow_version': 'da0a46cbcaac48bd36b7f83805b73cde', 'parameters': {}, 'idempotency_key': None, 'context': {}, 'empirical_policy': {'max_retries': 0, 'retry_delay_seconds': 0.0, 'retries': 0, 'retry_delay': 0, 'pause_keys': set(), 'resuming': False}, 'tags': [], 'parent_task_run_id': None, 'run_count': 1, 'expected_start_time': DateTime(2024, 10, 23, 17, 36,
Exception: 'KeyError("No class found for dispatch key \'s3-bucket\' in registry for type \'WritableFileSystem\'.")'
If I remove the task execution, then all is fine and I can see persisted results in the bucket:
from prefect import flow, task, get_run_logger
from prefect.futures import wait
from prefect_dask import DaskTaskRunner
@task()
def task(param):
get_run_logger().info(f"task {param}")
@flow(task_runner=DaskTaskRunner(), result_storage='s3-bucket/minio-results')
def test_flow():
get_run_logger().info('test_flow')
# futures = []
# for i in range(1):
# futures.append(task.submit(i))
#
# wait(futures)
if __name__ == "__main__":
test_flow()
The result is:
20:41:53.629 | INFO | Flow run 'phenomenal-antelope' - test_flow
If I leave the tasks execution but remove the result_storage argument, then it works just fine.
from prefect import flow, task, get_run_logger
from prefect.futures import wait
from prefect_dask import DaskTaskRunner
@task()
def task(param):
get_run_logger().info(f"task {param}")
@flow(task_runner=DaskTaskRunner())
def test_flow():
get_run_logger().info('test_flow')
futures = []
for i in range(1):
futures.append(task.submit(i))
wait(futures)
if __name__ == "__main__":
test_flow()
Result:
20:37:53.420 | INFO | Flow run 'lavender-chupacabra' - test_flow
20:37:53.442 | INFO | distributed.scheduler - Receive client connection: Client-worker-8830a3c0-9165-11ef-a077-7ae6abfb717e
20:37:53.442 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:51752
20:37:53.443 | INFO | prefect.task_runner.dask - Connecting to an existing Dask cluster at tcp://127.0.0.1:51726
20:37:53.445 | INFO | distributed.scheduler - Receive client connection: PrefectDaskClient-8831630a-9165-11ef-a077-7ae6abfb717e
20:37:53.446 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:51755
20:37:53.478 | INFO | Task run 'task-bd4' - task 0
20:37:53.483 | INFO | Task run 'task-bd4' - Finished in state Completed()
If I switch to remote file system result storage, then the error is different:
from prefect import flow, task, get_run_logger
from prefect.futures import wait
from prefect_dask import DaskTaskRunner
@task()
def task(param):
get_run_logger().info(f"task {param}")
@flow(task_runner=DaskTaskRunner(), result_storage='remote-file-system/minio-results')
def test_flow():
get_run_logger().info('test_flow')
futures = []
for i in range(1):
futures.append(task.submit(i))
wait(futures)
if __name__ == "__main__":
test_flow()
Exception: "1 validation error for EngineContext\nresult_store.result_storage.basepath\n Field required [type=missing, input_value={'block_type_slug': 'remo... '_is_anonymous': False}, input_type=dict]\n For further information visit https://errors.pydantic.dev/2.9/v/missing"
Version info (prefect version output)
Version: 3.0.10
API version: 0.8.4
Python version: 3.10.10
Git commit: 3aa2d893
Built: Tue, Oct 15, 2024 1:31 PM
OS/Arch: darwin/arm64
Profile: pd-flow-local
Server type: server
Pydantic version: 2.9.2
Integrations:
prefect-aws: 0.5.1
prefect-email: 0.4.0
prefect-dask: 0.3.1
Additional context
Thank you for taking the time to look into this.
Probably I have missed some important information... I can add it anytime.
Another notice:
If I remove the task_runner=DaskTaskRunner() attribute it works
from prefect import flow, task, get_run_logger
from prefect.futures import wait
@task()
def task(param):
get_run_logger().info(f"task {param}")
@flow(result_storage='s3-bucket/minio-results')
def test_flow():
get_run_logger().info('test_flow')
futures = []
for i in range(10):
futures.append(task.submit(i))
wait(futures)
if __name__ == "__main__":
test_flow()
The result is as expected:
08:35:45.720 | INFO | prefect.engine - Created flow run 'hilarious-mastodon' for flow 'test-flow'
08:35:45.721 | INFO | prefect.engine - View at http://127.0.0.1:4210/runs/flow-run/099d20b6-15bd-48a8-8823-942715ced1db
08:35:45.979 | INFO | Flow run 'hilarious-mastodon' - test_flow
08:35:46.176 | INFO | Task run 'task-b43' - task 0
08:35:46.189 | INFO | Task run 'task-b43' - Finished in state Completed()
08:35:46.282 | INFO | Task run 'task-48a' - task 2
08:35:46.293 | INFO | Task run 'task-48a' - Finished in state Completed()
08:35:46.333 | INFO | Task run 'task-248' - task 1
08:35:46.344 | INFO | Task run 'task-248' - Finished in state Completed()
08:35:46.383 | INFO | Task run 'task-150' - task 7
08:35:46.395 | INFO | Task run 'task-150' - Finished in state Completed()
08:35:46.506 | INFO | Task run 'task-bd3' - task 9
08:35:46.518 | INFO | Task run 'task-bd3' - Finished in state Completed()
08:35:46.556 | INFO | Task run 'task-ef7' - task 8
08:35:46.568 | INFO | Task run 'task-ef7' - Finished in state Completed()
08:35:46.606 | INFO | Task run 'task-af4' - task 5
08:35:46.618 | INFO | Task run 'task-af4' - Finished in state Completed()
08:35:46.724 | INFO | Task run 'task-fff' - task 3
08:35:46.737 | INFO | Task run 'task-fff' - Finished in state Completed()
08:35:46.774 | INFO | Task run 'task-fe5' - task 4
08:35:46.785 | INFO | Task run 'task-fe5' - Finished in state Completed()
08:35:46.814 | INFO | Task run 'task-fa0' - task 6
08:35:46.824 | INFO | Task run 'task-fa0' - Finished in state Completed()
08:35:46.848 | INFO | Flow run 'hilarious-mastodon' - Finished in state Completed()
So is it a Dask issue?
I can confirm that the code above works with prefect 2
Version: 2.20.9
API version: 0.8.4
Python version: 3.10.10
Git commit: b101915a
Built: Tue, Oct 1, 2024 12:41 PM
OS/Arch: darwin/arm64
Profile: pd-flow-local
Server type: server
requirements.txt
prefect~=2.20.9
prefect_dask~=0.2.10
Thanks for opening an issue @cvetelinandreevdreamix! I suspect that prefect-aws isn't available in the Dask execution environment, so Prefect cannot resolve the block slug you provided. I'll have to look into why that is, but could you try this code to see if it works?
from prefect import flow, task, get_run_logger
from prefect.futures import wait
from prefect_dask import DaskTaskRunner
from prefect_aws import S3Bucket
@task()
def task(param):
get_run_logger().info(f"task {param}")
@flow(task_runner=DaskTaskRunner(), result_storage=S3Bucket.load('minio-results'))
def test_flow():
get_run_logger().info('test_flow')
futures = []
for i in range(1):
futures.append(task.submit(i))
wait(futures)
if __name__ == "__main__":
test_flow()
Hey Alex.
Hold on. I'm trying to migrate to workers on prefect 2.20.9 and then will try to switch to 3 so I can isolate the issue.
P.S AFAIR the code you provided was giving an error that it could not find the class for slug s3-bucket.
Note that if gives an error when I use the slug ‘remote-file-system/minio-results’ which is not part of prefect-aws.
It seems there is a problem with the DaskRunner on 2.20.9.
The following code gives an error ModuleNotFoundError: No module named 'common'
from prefect import task, get_run_logger, flow
from prefect_dask import DaskTaskRunner
from common.connectors import smt_common
@task()
def task():
smt_common()
@flow(task_runner=DaskTaskRunner())
def test_flow():
get_run_logger().info('test_flow')
futures = []
for i in range(10):
futures.append(task.submit())
for future in futures:
future.wait()
# for i in range(10):
# task()
return futures
if __name__ == "__main__":
test_flow()
If I don't use DaskRunner as a task runner, or if I don't use futures, it works fine.
I tried to extract some code for debugging test.zip
Version info (prefect version output)
Version: 2.20.9
API version: 0.8.4
Python version: 3.10.10
Git commit: b101915a
Built: Tue, Oct 1, 2024 12:41 PM
OS/Arch: darwin/arm64
Profile: pd-flow-local
Server type: server
###### Requirements with Version Specifiers ######
prefect~=2.20.9
prefect_dask~=0.2.10
prefect-docker==0.4.0
Note that the flow works if I deploy it the with an agent:
from test import test_flow
from prefect.deployments import Deployment
from prefect.filesystems import RemoteFileSystem
from prefect.infrastructure import DockerContainer
import os
deployment = Deployment.build_from_flow(
name="test_flow",
flow=test_flow,
storage=RemoteFileSystem.load('minio'),
path='test_flow',
version=os.getenv("GIT_COMMIT_SHA"),
infrastructure=DockerContainer(
auto_remove=True,
image='prefect-default-worker:2.20.9',
image_pull_policy='NEVER',
network_mode='bridge',
),
)
deployment.apply()
It seems you guys are too busy to fix this and unblock my migration. I would love to use the automation feature in prefect 3.
@desertaxle Hello 👋
I'm also hitting the same error with Dask and a result storage of type GcsBucket from prefect-gcp even though the flow and the dask workers have access to the same deps versions. If I don't define a result storage but if I use dask, everything works fine. And if I don't use dask but define a result storage, it also works fine.
Lmk what test should I try (locally and in a k8s cluster) and/or if I should open a new issue.
Here is a reproducible example running locally with latest versions:
Code:
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
from prefect_gcp import GcsBucket
from prefect.futures import wait
@task
def do_stuff():
return f"Task done"
@flow(
task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 1}
),
result_storage=GcsBucket(
bucket="my-cache-bucket",
bucket_folder="cache"
).load(name="cache")
)
def dummy():
wait([do_stuff.submit()])
if __name__ == '__main__':
dummy()
Versions (also tested with 3.1.5):
Version: 3.1.4
API version: 0.8.4
Python version: 3.11.9
Git commit: 78ee41cb
Built: Wed, Nov 20, 2024 7:37 PM
OS/Arch: linux/x86_64
Profile: ephemeral
Server type: ephemeral
Pydantic version: 2.9.2
Server:
Database: sqlite
SQLite version: 3.31.1
Integrations:
prefect-gcp: 0.6.2
prefect-slack: 0.3.1
prefect-dask: 0.3.2
Error:
09:42:24.465 | WARNING | distributed.worker - Compute Failed
Key: do_stuff-0ab8554adecc56701c22e5c66b389275
Function: do_stuff
args: ()
kwargs: {'task': <prefect.tasks.Task object at 0x7f0973594fd0>, 'task_run_id': UUID('d3a1c564-ee39-4521-8336-0e8996e2959a'), 'context': {'flow_run_context': {'flow': <prefect.flows.Flow object at 0x7f0973596610>, 'flow_run': {'id': UUID('d33b70a6-1a91-416e-b7d3-ab6058a995eb'), 'created': DateTime(2024, 12, 3, 0, 42, 13, 899257, tzinfo=Timezone('UTC')), 'updated': DateTime(2024, 12, 3, 0, 42, 13, 953000, tzinfo=Timezone('UTC')), 'name': 'ancient-okapi', 'flow_id': UUID('1950e4cb-b01a-41f8-be8d-1edfc147db1c'), 'state_id': UUID('66be69fb-f22b-4758-ad54-20a7806abeec'), 'deployment_id': None, 'deployment_version': None, 'work_queue_name': None, 'flow_version': 'b286ddbfde461df4d140062e9ed8888c', 'parameters': {}, 'idempotency_key': None, 'context': {}, 'empirical_policy': {'max_retries': 0, 'retry_delay_seconds': 0.0, 'retries': 0, 'retry_delay': 0, 'pause_keys': set(), 'resuming': False, 'retry_type': None}, 'tags': [], 'parent_task_run_id': None, 'run_count': 1, 'expected_start_time': DateTime(20
Exception: "1 validation error for EngineContext\nresult_store.result_storage.bucket\n Field required [type=missing, input_value={'block_type_slug': 'gcs-... '_is_anonymous': False}, input_type=dict]\n For further information visit https://errors.pydantic.dev/2.9/v/missing"
It looks like the error happens in the worker when trying to deserialize the flow run context's restult storage of its result store (missing bucket field).
If I look at the serialization happening in the prefect dask client (see here), the result store looks like this:
{'cache_result_in_memory': True, 'metadata_storage': {}, 'result_storage': {'_block_document_id': UUID('4fc5bd90-c377-48c4-aa38-cf3837b5b618'), '_block_document_name': 'cache', '_is_anonymous': False, 'block_type_slug': 'gcs-bucket'}}
So no bucket field indeed for the result storage in my case (and for @cvetelinandreevdreamix's case, I guess no basepath is serialized when using prefect_aws instead of prefect_gcp).
Hope this helps!
And to add additional information on the other error, I'm also experiencing the KeyError exception when using the result_storage as a string rather than a GcsBucket instance (same is happening when using the PREFECT_RESULTS_DEFAULT_STORAGE_BLOCK and PREFECT_RESULTS_PERSIST_BY_DEFAULT env vars).
With result storage as gcs-bucket/cache:
11:26:50.036 | WARNING | distributed.worker - Compute Failed
Key: do_stuff-b119da3353a9867c372768f5983e746c
Function: do_stuff
args: ()
kwargs: {'task': <prefect.tasks.Task object at 0x7f4700846f10>, 'task_run_id': UUID('36d06690-1653-46c9-b1c0-f96b5ae3cf0e'), 'context': {'flow_run_context': {'flow': <prefect.flows.Flow object at 0x7f4700881a10>, 'flow_run': {'id': UUID('f0511c87-a91c-4cc8-88dd-f0be415bd530'), 'created': DateTime(2024, 12, 3, 2, 26, 31, 536187, tzinfo=Timezone('UTC')), 'updated': DateTime(2024, 12, 3, 2, 26, 31, 605000, tzinfo=Timezone('UTC')), 'name': 'towering-gecko', 'flow_id': UUID('1950e4cb-b01a-41f8-be8d-1edfc147db1c'), 'state_id': UUID('ebd3a5f8-b4e4-4c5b-b85b-ee4650051b7e'), 'deployment_id': None, 'deployment_version': None, 'work_queue_name': None, 'flow_version': '7de5400043e3aa6fdc252508ee884975', 'parameters': {}, 'idempotency_key': None, 'context': {}, 'empirical_policy': {'max_retries': 0, 'retry_delay_seconds': 0.0, 'retries': 0, 'retry_delay': 0, 'pause_keys': set(), 'resuming': False, 'retry_type': None}, 'tags': [], 'labels': {'prefect.flow.id': '1950e4cb-b01a-41f8-be8d-1edfc147db1c'}, 'paren
Exception: 'KeyError("No class found for dispatch key \'gcs-bucket\' in registry for type \'WritableFileSystem\'.")'
Looks like the GcsBucket class is not registered in the registry for WritableFileSystem on the worker side?
Thanks for the info @john-jam and @cvetelinandreevdreamix! I believe I have a fix for the issues you're seeing in https://github.com/PrefectHQ/prefect/pull/16189. If you could try out the changes when you get a chance, it would be super helpful!
Awesome news @desertaxle!
Will this fix be included in the next nightly build 3.1.6.dev2? I'll try to test this from the main branch in the meantime.
hi @john-jam
Will this fix be included in the next nightly build 3.1.6.dev2?
yes!
@desertaxle Tested with the updates from your branch on my local env and it seems like it does the trick! Thanks for the quick fix.
Hi all. @desertaxle Thanks for the fix. I can confirm that the slug issue is fixed.
However there are few more. I'm attaching a project you can use to reproduce the error test-prefect-migration copy.zip
2024-12-06 12:06:12 ModuleNotFoundError: No module named 'common'It happens when:
- I deploy the flow to Minio. Use
python deployment.py - I use a function from another module in the task I want to run with dask
If I run the flow from the local file system (python deployment.py) then it works fine
If I deploy and flow and I don't use a function imported from another module, then it works fine
My case is that I have a shared code between multiple flows and it is extracted in a shared module (common).
I can see the common folder is deployed to Minio.
- The deployment of the flow to Minio doesn't work if I define the flow with the RemoteFileSystem class:
@flow(task_runner=DaskTaskRunner(), result_storage=RemoteFileSystem.load('minio-results'))It works if i give just the slug@flow(task_runner=DaskTaskRunner(), result_storage='remote-file-system/minio-results')
Let me know If you need more information. I need to migrate to prefect 3 and use the automations.
Thank you for your time!