prefect
prefect copied to clipboard
Creating flow run from deployment fails with Pydantic 2 parameters
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar issue and didn't find it.
- [X] I searched the Prefect documentation for this issue.
- [X] I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
When a flow uses Pydantic 2 models and is run from a deployment, orjson fails to serialize them.
This is similar to https://github.com/PrefectHQ/prefect/issues/9536, except that parameters don't use a custom class, but a normal Pydantic model, which seems to be supported by Prefect. This issue is blocking the switch to Pydantic 2 for me.
Reproduction
def test_prefect_pydantic_serialization():
from prefect import Flow, flow
from prefect.deployments import Deployment, run_deployment
from pydantic import BaseModel # pydantic 2!
class Parameters(BaseModel):
param: int = 1
@flow
def my_flow(parameters: Parameters):
pass
parameters = Parameters(param=2)
# Works
my_flow(parameters=parameters)
# Fails
deployment = Deployment.build_from_flow(flow=my_flow, name="test")
deployment_id = deployment.apply(upload=True)
deployment_name = f"{deployment.flow_name}/{deployment.name}"
run_deployment(name=deployment_name, parameters=dict(parameters=parameters))
Error
src/spacem-batch/spacem_batch/tests/workflow_spacem_reprocessing_test.py:185 (test_prefect_pydantic_serialization)
TypeError: Object of type 'Parameters' is not JSON serializable
The above exception was the direct cause of the following exception:
def test_prefect_pydantic_serialization():
from prefect import Flow, flow
from prefect.deployments import Deployment, run_deployment
from pydantic import BaseModel # pydantic 2!
class Parameters(BaseModel):
param: int = 1
@flow
def my_flow(parameters: Parameters):
pass
parameters = Parameters(param=2)
# Works
my_flow(parameters=parameters)
# Fails
deployment = Deployment.build_from_flow(flow=my_flow, name="test")
deployment_id = deployment.apply(upload=True)
deployment_name = f"{deployment.flow_name}/{deployment.name}"
> run_deployment(name=deployment_name, parameters=dict(parameters=parameters))
workflow_spacem_reprocessing_test.py:207:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
…/lib/python3.10/site-packages/prefect/utilities/asyncutils.py:259: in coroutine_wrapper
return call()
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:431: in __call__
return self.result()
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:317: in result
return self.future.result(timeout=timeout)
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:178: in result
return self.__get_result()
…/lib/python3.10/concurrent/futures/_base.py:403: in __get_result
raise self._exception
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:388: in _run_async
result = await coro
…/lib/python3.10/site-packages/prefect/client/utilities.py:51: in with_injected_client
return await fn(*args, **kwargs)
…/lib/python3.10/site-packages/prefect/deployments/deployments.py:188: in run_deployment
flow_run = await client.create_flow_run_from_deployment(
…/lib/python3.10/site-packages/prefect/client/orchestration.py:560: in create_flow_run_from_deployment
json=flow_run_create.dict(json_compatible=True, exclude_unset=True),
…/lib/python3.10/site-packages/prefect/_internal/schemas/bases.py:157: in dict
return json.loads(self.json(*args, **kwargs))
…/lib/python3.10/site-packages/prefect/_internal/schemas/bases.py:111: in json
return super().json(*args, **kwargs)
…/lib/python3.10/site-packages/pydantic/v1/main.py:504: in json
return self.__config__.json_dumps(data, default=encoder, **dumps_kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
v = {'context': {}, 'idempotency_key': None, 'name': 'blond-ibex', 'parameters': {'parameters': Parameters(param=2)}, ...}
def orjson_dumps_extra_compatible(v: Any, *, default: Any) -> str:
"""
Utility for dumping a value to JSON using orjson, but allows for
1) non-string keys: this is helpful for situations like pandas dataframes,
which can result in non-string keys
2) numpy types: for serializing numpy arrays
orjson.dumps returns bytes, to match standard json.dumps we need to decode.
"""
> return orjson.dumps(
v, default=default, option=orjson.OPT_NON_STR_KEYS | orjson.OPT_SERIALIZE_NUMPY
).decode()
E TypeError: Type is not JSON serializable: Parameters
Versions
Version: 2.16.3
API version: 0.8.4
Python version: 3.10.13
Git commit: e3f02c00
Built: Thu, Mar 7, 2024 4:56 PM
OS/Arch: linux/x86_64
Profile: default
Server type: ephemeral
Server:
Database: sqlite
SQLite version: 3.41.2
Additional context
prefect==2.16.3 # same for prefect==2.14.20, prefect==2.15.0 pydantic==2.6.3 # same for pydantic==2.0.0 orjson==3.9.15