prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Creating flow run from deployment fails with Pydantic 2 parameters

Open aeisenbarth opened this issue 11 months ago • 4 comments

First check

  • [X] I added a descriptive title to this issue.
  • [X] I used the GitHub search to find a similar issue and didn't find it.
  • [X] I searched the Prefect documentation for this issue.
  • [X] I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When a flow uses Pydantic 2 models and is run from a deployment, orjson fails to serialize them.

This is similar to https://github.com/PrefectHQ/prefect/issues/9536, except that parameters don't use a custom class, but a normal Pydantic model, which seems to be supported by Prefect. This issue is blocking the switch to Pydantic 2 for me.

Reproduction

def test_prefect_pydantic_serialization():
    from prefect import Flow, flow
    from prefect.deployments import Deployment, run_deployment
    from pydantic import BaseModel  # pydantic 2!

    class Parameters(BaseModel):
        param: int = 1

    @flow
    def my_flow(parameters: Parameters):
        pass

    parameters = Parameters(param=2)

    # Works
    my_flow(parameters=parameters)

    # Fails
    deployment = Deployment.build_from_flow(flow=my_flow, name="test")
    deployment_id = deployment.apply(upload=True)
    deployment_name = f"{deployment.flow_name}/{deployment.name}"
    run_deployment(name=deployment_name, parameters=dict(parameters=parameters))

Error

src/spacem-batch/spacem_batch/tests/workflow_spacem_reprocessing_test.py:185 (test_prefect_pydantic_serialization)
TypeError: Object of type 'Parameters' is not JSON serializable

The above exception was the direct cause of the following exception:

    def test_prefect_pydantic_serialization():
        from prefect import Flow, flow
        from prefect.deployments import Deployment, run_deployment
        from pydantic import BaseModel  # pydantic 2!
    
        class Parameters(BaseModel):
            param: int = 1
    
        @flow
        def my_flow(parameters: Parameters):
            pass
    
        parameters = Parameters(param=2)
    
        # Works
        my_flow(parameters=parameters)
    
        # Fails
        deployment = Deployment.build_from_flow(flow=my_flow, name="test")
        deployment_id = deployment.apply(upload=True)
        deployment_name = f"{deployment.flow_name}/{deployment.name}"
>       run_deployment(name=deployment_name, parameters=dict(parameters=parameters))

workflow_spacem_reprocessing_test.py:207: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
…/lib/python3.10/site-packages/prefect/utilities/asyncutils.py:259: in coroutine_wrapper
    return call()
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:431: in __call__
    return self.result()
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:317: in result
    return self.future.result(timeout=timeout)
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:178: in result
    return self.__get_result()
…/lib/python3.10/concurrent/futures/_base.py:403: in __get_result
    raise self._exception
…/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py:388: in _run_async
    result = await coro
…/lib/python3.10/site-packages/prefect/client/utilities.py:51: in with_injected_client
    return await fn(*args, **kwargs)
…/lib/python3.10/site-packages/prefect/deployments/deployments.py:188: in run_deployment
    flow_run = await client.create_flow_run_from_deployment(
…/lib/python3.10/site-packages/prefect/client/orchestration.py:560: in create_flow_run_from_deployment
    json=flow_run_create.dict(json_compatible=True, exclude_unset=True),
…/lib/python3.10/site-packages/prefect/_internal/schemas/bases.py:157: in dict
    return json.loads(self.json(*args, **kwargs))
…/lib/python3.10/site-packages/prefect/_internal/schemas/bases.py:111: in json
    return super().json(*args, **kwargs)
…/lib/python3.10/site-packages/pydantic/v1/main.py:504: in json
    return self.__config__.json_dumps(data, default=encoder, **dumps_kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

v = {'context': {}, 'idempotency_key': None, 'name': 'blond-ibex', 'parameters': {'parameters': Parameters(param=2)}, ...}

    def orjson_dumps_extra_compatible(v: Any, *, default: Any) -> str:
        """
        Utility for dumping a value to JSON using orjson, but allows for
        1) non-string keys: this is helpful for situations like pandas dataframes,
        which can result in non-string keys
        2) numpy types: for serializing numpy arrays
    
        orjson.dumps returns bytes, to match standard json.dumps we need to decode.
        """
>       return orjson.dumps(
            v, default=default, option=orjson.OPT_NON_STR_KEYS | orjson.OPT_SERIALIZE_NUMPY
        ).decode()
E       TypeError: Type is not JSON serializable: Parameters

Versions

Version:             2.16.3
API version:         0.8.4
Python version:      3.10.13
Git commit:          e3f02c00
Built:               Thu, Mar 7, 2024 4:56 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.41.2

Additional context

prefect==2.16.3 # same for prefect==2.14.20, prefect==2.15.0 pydantic==2.6.3 # same for pydantic==2.0.0 orjson==3.9.15

aeisenbarth avatar Mar 13 '24 10:03 aeisenbarth