prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Custom Result Serializer deserialize method is not used when loading result from flow run

Open peterroelants opened this issue 3 years ago • 6 comments

Description

I have a Task with a PrefectResult that uses a custom serializer that is a subclass of JSONSerializer:

@prefect.task(result=PrefectResult(serializer=CustomJsonSerializer()))

When running a flow with this Task via StartFlowRun and fetching the result via prefect.Client the result is not deserialized by my custom serializer, but rather by the default JSONSerializer.

Expected Behavior

I would expect a custom serializer added to a Prefect Result to be used for deserialization when loading the result.

Reproduction

I've tried to make a minimal toy example reproducing the issue:

The following code:

from dataclasses import dataclass

import prefect
from prefect.engine import signals, state
from prefect.engine.results.prefect_result import PrefectResult
from prefect.engine.serializers import JSONSerializer
from prefect.tasks.prefect import StartFlowRun


@dataclass
class ResultClass:
    test_var: int


class CustomJsonSerializer(JSONSerializer):
    def serialize(self, value: ResultClass) -> bytes:
        return str(value.test_var).encode()

    def deserialize(self, value: bytes) -> ResultClass:
        return ResultClass(test_var=int(value.decode()))


@prefect.task(result=PrefectResult(serializer=CustomJsonSerializer()))
def test_task() -> ResultClass:
    return ResultClass(test_var=42)


def main():
    # Create flow and register
    with prefect.Flow("test-flow") as flow:
        res = test_task()
    flow.register(project_name="test_project")
    # Run flow
    flow_run = StartFlowRun(
        flow_name="test-flow",
        project_name="test_project",
        wait=True,
    )
    try:
        flow_run.run()
    except signals.SUCCESS as exc:
        flow_state = exc.state
    assert isinstance(flow_state, state.Success)
    # Parse result
    flow_run_id = str(flow_state.result).split(" ")[0]
    client = prefect.Client()
    flow_run_info = client.get_flow_run_info(flow_run_id)
    task_run = flow_run_info.task_runs[0]
    task_result = task_run.state.load_result(task_run.state._result).result
    print("task_result: ", task_result)
    print("type(task_result): ", type(task_result))
    assert isinstance(task_result, ResultClass)


if __name__ == "__main__":
    main()

Has as result:

task_result:  42
type(task_result):  <class 'int'>
Traceback (most recent call last):
  File "/home/peter/workspace/sandbox/tmp/deserialize_prefect.py", line 56, in <module>
    main()
  File "/home/peter/workspace/sandbox/tmp/deserialize_prefect.py", line 52, in main
    assert isinstance(task_result, ResultClass)
AssertionError

Note that the result type is not the ResultClass as you would expect from the deserialize method.

Environment

{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "Linux-5.10.0-1029-oem-x86_64-with-glibc2.31",
    "prefect_backend": "server",
    "prefect_version": "0.14.21",
    "python_version": "3.9.4"
  }
}

peterroelants avatar Jun 08 '21 18:06 peterroelants

Hey @peterroelants, thanks for the thorough report.

This is a bit tricky; the serialized Result type in the backend does not include the serializer attribute so when you retrieve your task run state there you're receiving a PrefectResult instance without your custom serializer. During flow execution, the Result instance is pulled directly from the Task so we can fully support custom result types and serializers (which is why it is written to the database correctly). This means that within the context of flow run execution, it will also be read correctly from the database. Pulling the task state from the database and loading the result manually will not work as you've seen.

To make this work as expected we'll have to:

  • Add ResultSerializerSchema and sub-schemas to prefect.serialization.results
  • Add a CustomSerializerSchema to serialize custom Serializer subtypes into qualified names and deserialize by import

This is a pretty significant change so I'm not sure when we will be able to do this. At the very least, we should add the name of the serializer to the ResultSchema so we can warn when someone tries to use it this way.

zanieb avatar Jun 08 '21 20:06 zanieb

I see, so if I understand correctly the state.load_result method calls PrefectResult.read. However, since the flow run (and its state and results) are fetched via client.get_flow_run_info, the result serializer gets lost because it is not stored on the backend itself. PrefectResult falls back on its default JSONSerializer, thus leading to the observed behavior?

So we need to serialize the serializers, so they can be deserialized to deserialize the result? drawing

peterroelants avatar Jun 09 '21 06:06 peterroelants

Btw, to give more detail, I'm currently using this to serialize/deserialize pydantic objects that I use as datamodel between my tasks/flows. For example this is the custom serializer that I'm using at the moment:

from typing import Generic, Type, TypeVar

import pydantic
from prefect.engine.serializers import JSONSerializer

PydanticModel = TypeVar("PydanticModel", bound=pydantic.BaseModel)


class PydanticJsonSerializer(JSONSerializer, Generic[PydanticModel]):
    """
    Example use:
    `task_a = TaskA(result=PrefectResult(serializer=PydanticJsonSerializer(TaskAResult)))`
    """
    def __init__(self, pydantic_type: Type[PydanticModel]):
        super().__init__()
        self.pydantic_type: Type[PydanticModel] = pydantic_type

    def serialize(self, value: PydanticModel) -> bytes:
        return value.json(indent=2).encode()

    def deserialize(self, value: bytes) -> PydanticModel:
        return self.pydantic_type.parse_raw(value)

peterroelants avatar Jun 09 '21 06:06 peterroelants

Since I have access to the child-flow's code after running StartFlowRun I have been able to get the custom serializer from the flow's task.

I have created 3 possible tasks to get the results from a flow run:

  • GetFlowResult that returns Any
  • GetFlowResultAsType that returns a given type from the result hydrated (deserialized) from a given Result object.
  • GetFlowResultTypedFromFlow that returns a given type from the flow's code (need access to flow code) These are based on my previous task to get a flow run result:
import abc
import logging
from typing import Any, Generic, List, Optional, Type, TypeVar

import prefect
from prefect.client.client import TaskRunInfoResult
from prefect.engine import signals
from prefect.engine.result import Result

TypeOfResult = TypeVar("TypeOfResult")


log = logging.getLogger(__name__)


class GetFlowResultABC(prefect.Task, Generic[TypeOfResult]):
    """
    Get flow result.

    The flow's result depends on a specific task with a given slug.
    """

    def __init__(
        self, target_task_slug: str, api_server: Optional[str] = None, **kwargs
    ):
        self.target_task_slug = target_task_slug
        self.api_server = api_server
        super().__init__(**kwargs)

    def run(  # type: ignore
        self, flow_run_signal: signals.PrefectStateSignal
    ) -> TypeOfResult:
        assert isinstance(flow_run_signal, signals.PrefectStateSignal)
        flow_run_id = parse_flow_run_id(flow_run_signal)
        return self.get_result(flow_run_id)

    @abc.abstractmethod
    def get_result(self, flow_run_id: str) -> TypeOfResult:
        ...

    def get_target_task_run_info(self, flow_run_id: str) -> TaskRunInfoResult:
        return get_target_task_run_info(
            flow_run_id=flow_run_id,
            target_task_slug=self.target_task_slug,
            api_server=self.api_server,
        )


class GetFlowResult(GetFlowResultABC):
    """
    Get flow result as Any.

    The flow's result depends on a specific task with a given slug.
    """

    def get_result(self, flow_run_id: str) -> Any:
        target_task_run = self.get_target_task_run_info(flow_run_id)
        return target_task_run.state.load_result(target_task_run.state.result).result


class GetFlowResultAsType(GetFlowResultABC):
    """
    Get the result from a Flow run by `StartFlowRun` as a typed result.

    The flow's result depends on a specific task with a given slug.

    A result hydrator needs to be passed specifically
    """

    def __init__(
        self,
        target_task_slug: str,
        result_hydrator: Result,
        result_type: Type[TypeOfResult],
        api_server: Optional[str] = None,
        **kwargs,
    ):
        self.result_type = result_type
        self.result_hydrator: Result = result_hydrator
        super().__init__(
            target_task_slug=target_task_slug, api_server=api_server, **kwargs
        )

    def get_result(self, flow_run_id: str) -> TypeOfResult:
        target_task_run = self.get_target_task_run_info(flow_run_id)
        result = target_task_run.state.load_result(self.result_hydrator).result
        assert isinstance(result, self.result_type)
        return result


class GetFlowResultTypedFromFlow(GetFlowResultAsType):
    """
    Get the result from a Flow run by `StartFlowRun` as a typed result.

    The flow's result depends on a specific task with a given slug.

    A reference to the flow object is needed to get it's result hydrator.
    """

    def __init__(
        self,
        target_flow: prefect.Flow,
        target_task_slug: str,
        result_type: Type[TypeOfResult],
        api_server: Optional[str] = None,
        **kwargs,
    ):
        _target_task = get_task_from_flow(
            flow=target_flow, target_slug=target_task_slug
        )
        assert (
            _target_task.result
        ), f"Target task {_target_task!r} has not assigned a Result type."
        result_hydrator: Result = _target_task.result
        super().__init__(
            target_task_slug=target_task_slug,
            result_hydrator=result_hydrator,
            result_type=result_type,
            api_server=api_server,
            **kwargs,
        )


def get_task_from_flow(flow: prefect.Flow, target_slug: str) -> prefect.Task:
    tasks = list(filter(lambda t: t.slug and target_slug in t.slug, flow.tasks))
    assert len(tasks) <= 1, f"Multiple tasks found for slug={target_slug!r}!"
    assert len(tasks) > 0, f"No tasks found for slug={target_slug!r}!"
    return tasks[0]


def parse_flow_run_id(flow_run_signal: signals.PrefectStateSignal) -> str:
    return str(flow_run_signal).split(" ")[0]


def get_target_task_run_info(
    flow_run_id: str, target_task_slug: str, api_server: Optional[str] = None
) -> TaskRunInfoResult:
    """Get Task run info based on a flow run and a task slug."""
    # Fall back on config if `api_server` is None
    client = prefect.Client(api_server)
    log.info(f"Prefect Client created to connect with {client.api_server!r}.")
    flow_run_info = client.get_flow_run_info(flow_run_id)
    return get_target_task_run_from_slug(
        target_slug=target_task_slug, task_runs=flow_run_info.task_runs
    )


def get_target_task_run_from_slug(
    target_slug: str, task_runs: List[TaskRunInfoResult]
) -> TaskRunInfoResult:
    """Get a given task based on the task's slug from a list of task runs."""
    target_task_runs = list(
        filter(lambda t: t.task_slug and target_slug in t.task_slug, task_runs)
    )
    assert (
        len(target_task_runs) <= 1
    ), f"Multiple task runs found for slug={target_slug!r}!"
    assert len(target_task_runs) > 0, f"No task runs found for slug={target_slug!r}!"
    return target_task_runs[0]

This is a bit of a workaround, but seems to work with the custom pydantic serializer posted above.

peterroelants avatar Jun 09 '21 09:06 peterroelants

I see, so if I understand correctly the state.load_result method calls PrefectResult.read. However, since the flow run (and its state and results) are fetched via client.get_flow_run_info, the result serializer gets lost because it is not stored on the backend itself. PrefectResult falls back on its default JSONSerializer, thus leading to the observed behavior?

So we need to serialize the serializers, so they can be deserialized to deserialize the result?

Yep exactly. However, as written right now we can't serialize custom result classes and with the same design we won't be able to serialize custom sterilizers. We've got a rehaul of results on our roadmap so a simpler & more robust experience is coming here eventually.

Since I have access to the child-flow's code after running StartFlowRun I have been able to get the custom serializer from the flow's task.

We pull the serializer from the task itself during flow run execution as well. Sweet task!

zanieb avatar Jun 09 '21 14:06 zanieb

To make calling subflows and getting their result easier: I wrapped StartFlowRun and the above GetFlowResult tasks in a new Flow Run Task that:

  • Creates the parameters dict that needs to be passed to StartFlowRun.run.
  • Runs flow via StartFlowRun and wait for the signal execption.
  • Parse the result and return a typed result object.
class ExampleFlowRun(prefect.Task):
    """
    Run the Example flow and return the result.
    Flow needs to be registered on the correct project and name before running.
    """

    def __init__(self, project_name: str, **kwargs):
        super().__init__(**kwargs)
        self.start_flow_run = StartFlowRun(
            flow_name="Example-Flow-Name",
            project_name=project_name,
            wait=True,  # Wait for flow to finish
        )
        self.result_getter = GetFlowResultAsType(
            target_task_slug="example-flow-result-task-slug",
            result_hydrator=PrefectResult(serializer=PydanticJsonSerializer(ExampleFlowResult)),
            result_type=ExampleFlowResult,
        )

    def run(  # type: ignore
        self,
        value_a: TypeA,
        value_b: TypeB
    ) -> ExampleFlowResult:
        parameters = {
            "value_a": value_a,
            "value_b": value_b,
        }
        flow_run_signal = self._run_flow_safe(parameters=parameters)
        return self.result_getter.run(flow_run_signal)

    def _run_flow_safe(self, parameters: Dict[str, Any]) -> PrefectStateSignal:
        try:
            self.start_flow_run.run(parameters=parameters)
        except SUCCESS as flow_run_signal:
            return flow_run_signal
        except FAIL as exc:
            log.exception(exc)
            raise FAIL(f"Example Flow Failed Flow Run: {exc!s}") from exc
        raise RuntimeError("This should not happen!")

ExampleFlowResult is a pydantic object here.

peterroelants avatar Jun 18 '21 14:06 peterroelants

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

github-actions[bot] avatar Jan 22 '23 13:01 github-actions[bot]

This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.

github-actions[bot] avatar Feb 05 '23 14:02 github-actions[bot]