prefect
prefect copied to clipboard
Custom Result Serializer deserialize method is not used when loading result from flow run
Description
I have a Task
with a PrefectResult
that uses a custom serializer that is a subclass of JSONSerializer
:
@prefect.task(result=PrefectResult(serializer=CustomJsonSerializer()))
When running a flow with this Task via StartFlowRun
and fetching the result via prefect.Client
the result is not deserialized by my custom serializer, but rather by the default JSONSerializer
.
Expected Behavior
I would expect a custom serializer added to a Prefect Result to be used for deserialization when loading the result.
Reproduction
I've tried to make a minimal toy example reproducing the issue:
The following code:
from dataclasses import dataclass
import prefect
from prefect.engine import signals, state
from prefect.engine.results.prefect_result import PrefectResult
from prefect.engine.serializers import JSONSerializer
from prefect.tasks.prefect import StartFlowRun
@dataclass
class ResultClass:
test_var: int
class CustomJsonSerializer(JSONSerializer):
def serialize(self, value: ResultClass) -> bytes:
return str(value.test_var).encode()
def deserialize(self, value: bytes) -> ResultClass:
return ResultClass(test_var=int(value.decode()))
@prefect.task(result=PrefectResult(serializer=CustomJsonSerializer()))
def test_task() -> ResultClass:
return ResultClass(test_var=42)
def main():
# Create flow and register
with prefect.Flow("test-flow") as flow:
res = test_task()
flow.register(project_name="test_project")
# Run flow
flow_run = StartFlowRun(
flow_name="test-flow",
project_name="test_project",
wait=True,
)
try:
flow_run.run()
except signals.SUCCESS as exc:
flow_state = exc.state
assert isinstance(flow_state, state.Success)
# Parse result
flow_run_id = str(flow_state.result).split(" ")[0]
client = prefect.Client()
flow_run_info = client.get_flow_run_info(flow_run_id)
task_run = flow_run_info.task_runs[0]
task_result = task_run.state.load_result(task_run.state._result).result
print("task_result: ", task_result)
print("type(task_result): ", type(task_result))
assert isinstance(task_result, ResultClass)
if __name__ == "__main__":
main()
Has as result:
task_result: 42
type(task_result): <class 'int'>
Traceback (most recent call last):
File "/home/peter/workspace/sandbox/tmp/deserialize_prefect.py", line 56, in <module>
main()
File "/home/peter/workspace/sandbox/tmp/deserialize_prefect.py", line 52, in main
assert isinstance(task_result, ResultClass)
AssertionError
Note that the result type is not the ResultClass
as you would expect from the deserialize
method.
Environment
{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "Linux-5.10.0-1029-oem-x86_64-with-glibc2.31",
"prefect_backend": "server",
"prefect_version": "0.14.21",
"python_version": "3.9.4"
}
}
Hey @peterroelants, thanks for the thorough report.
This is a bit tricky; the serialized Result
type in the backend does not include the serializer
attribute so when you retrieve your task run state there you're receiving a PrefectResult
instance without your custom serializer. During flow execution, the Result
instance is pulled directly from the Task
so we can fully support custom result types and serializers (which is why it is written to the database correctly). This means that within the context of flow run execution, it will also be read correctly from the database. Pulling the task state from the database and loading the result manually will not work as you've seen.
To make this work as expected we'll have to:
- Add
ResultSerializerSchema
and sub-schemas toprefect.serialization.results
- Add a
CustomSerializerSchema
to serialize customSerializer
subtypes into qualified names and deserialize by import
This is a pretty significant change so I'm not sure when we will be able to do this. At the very least, we should add the name of the serializer to the ResultSchema
so we can warn when someone tries to use it this way.
I see, so if I understand correctly the state.load_result
method calls PrefectResult.read
. However, since the flow run (and its state and results) are fetched via client.get_flow_run_info
, the result serializer gets lost because it is not stored on the backend itself. PrefectResult
falls back on its default JSONSerializer
, thus leading to the observed behavior?
So we need to serialize the serializers, so they can be deserialized to deserialize the result?
Btw, to give more detail, I'm currently using this to serialize/deserialize pydantic objects that I use as datamodel between my tasks/flows. For example this is the custom serializer that I'm using at the moment:
from typing import Generic, Type, TypeVar
import pydantic
from prefect.engine.serializers import JSONSerializer
PydanticModel = TypeVar("PydanticModel", bound=pydantic.BaseModel)
class PydanticJsonSerializer(JSONSerializer, Generic[PydanticModel]):
"""
Example use:
`task_a = TaskA(result=PrefectResult(serializer=PydanticJsonSerializer(TaskAResult)))`
"""
def __init__(self, pydantic_type: Type[PydanticModel]):
super().__init__()
self.pydantic_type: Type[PydanticModel] = pydantic_type
def serialize(self, value: PydanticModel) -> bytes:
return value.json(indent=2).encode()
def deserialize(self, value: bytes) -> PydanticModel:
return self.pydantic_type.parse_raw(value)
Since I have access to the child-flow's code after running StartFlowRun
I have been able to get the custom serializer from the flow's task.
I have created 3 possible tasks to get the results from a flow run:
-
GetFlowResult
that returnsAny
-
GetFlowResultAsType
that returns a given type from the result hydrated (deserialized) from a given Result object. -
GetFlowResultTypedFromFlow
that returns a given type from the flow's code (need access to flow code) These are based on my previous task to get a flow run result:
import abc
import logging
from typing import Any, Generic, List, Optional, Type, TypeVar
import prefect
from prefect.client.client import TaskRunInfoResult
from prefect.engine import signals
from prefect.engine.result import Result
TypeOfResult = TypeVar("TypeOfResult")
log = logging.getLogger(__name__)
class GetFlowResultABC(prefect.Task, Generic[TypeOfResult]):
"""
Get flow result.
The flow's result depends on a specific task with a given slug.
"""
def __init__(
self, target_task_slug: str, api_server: Optional[str] = None, **kwargs
):
self.target_task_slug = target_task_slug
self.api_server = api_server
super().__init__(**kwargs)
def run( # type: ignore
self, flow_run_signal: signals.PrefectStateSignal
) -> TypeOfResult:
assert isinstance(flow_run_signal, signals.PrefectStateSignal)
flow_run_id = parse_flow_run_id(flow_run_signal)
return self.get_result(flow_run_id)
@abc.abstractmethod
def get_result(self, flow_run_id: str) -> TypeOfResult:
...
def get_target_task_run_info(self, flow_run_id: str) -> TaskRunInfoResult:
return get_target_task_run_info(
flow_run_id=flow_run_id,
target_task_slug=self.target_task_slug,
api_server=self.api_server,
)
class GetFlowResult(GetFlowResultABC):
"""
Get flow result as Any.
The flow's result depends on a specific task with a given slug.
"""
def get_result(self, flow_run_id: str) -> Any:
target_task_run = self.get_target_task_run_info(flow_run_id)
return target_task_run.state.load_result(target_task_run.state.result).result
class GetFlowResultAsType(GetFlowResultABC):
"""
Get the result from a Flow run by `StartFlowRun` as a typed result.
The flow's result depends on a specific task with a given slug.
A result hydrator needs to be passed specifically
"""
def __init__(
self,
target_task_slug: str,
result_hydrator: Result,
result_type: Type[TypeOfResult],
api_server: Optional[str] = None,
**kwargs,
):
self.result_type = result_type
self.result_hydrator: Result = result_hydrator
super().__init__(
target_task_slug=target_task_slug, api_server=api_server, **kwargs
)
def get_result(self, flow_run_id: str) -> TypeOfResult:
target_task_run = self.get_target_task_run_info(flow_run_id)
result = target_task_run.state.load_result(self.result_hydrator).result
assert isinstance(result, self.result_type)
return result
class GetFlowResultTypedFromFlow(GetFlowResultAsType):
"""
Get the result from a Flow run by `StartFlowRun` as a typed result.
The flow's result depends on a specific task with a given slug.
A reference to the flow object is needed to get it's result hydrator.
"""
def __init__(
self,
target_flow: prefect.Flow,
target_task_slug: str,
result_type: Type[TypeOfResult],
api_server: Optional[str] = None,
**kwargs,
):
_target_task = get_task_from_flow(
flow=target_flow, target_slug=target_task_slug
)
assert (
_target_task.result
), f"Target task {_target_task!r} has not assigned a Result type."
result_hydrator: Result = _target_task.result
super().__init__(
target_task_slug=target_task_slug,
result_hydrator=result_hydrator,
result_type=result_type,
api_server=api_server,
**kwargs,
)
def get_task_from_flow(flow: prefect.Flow, target_slug: str) -> prefect.Task:
tasks = list(filter(lambda t: t.slug and target_slug in t.slug, flow.tasks))
assert len(tasks) <= 1, f"Multiple tasks found for slug={target_slug!r}!"
assert len(tasks) > 0, f"No tasks found for slug={target_slug!r}!"
return tasks[0]
def parse_flow_run_id(flow_run_signal: signals.PrefectStateSignal) -> str:
return str(flow_run_signal).split(" ")[0]
def get_target_task_run_info(
flow_run_id: str, target_task_slug: str, api_server: Optional[str] = None
) -> TaskRunInfoResult:
"""Get Task run info based on a flow run and a task slug."""
# Fall back on config if `api_server` is None
client = prefect.Client(api_server)
log.info(f"Prefect Client created to connect with {client.api_server!r}.")
flow_run_info = client.get_flow_run_info(flow_run_id)
return get_target_task_run_from_slug(
target_slug=target_task_slug, task_runs=flow_run_info.task_runs
)
def get_target_task_run_from_slug(
target_slug: str, task_runs: List[TaskRunInfoResult]
) -> TaskRunInfoResult:
"""Get a given task based on the task's slug from a list of task runs."""
target_task_runs = list(
filter(lambda t: t.task_slug and target_slug in t.task_slug, task_runs)
)
assert (
len(target_task_runs) <= 1
), f"Multiple task runs found for slug={target_slug!r}!"
assert len(target_task_runs) > 0, f"No task runs found for slug={target_slug!r}!"
return target_task_runs[0]
This is a bit of a workaround, but seems to work with the custom pydantic serializer posted above.
I see, so if I understand correctly the state.load_result method calls PrefectResult.read. However, since the flow run (and its state and results) are fetched via client.get_flow_run_info, the result serializer gets lost because it is not stored on the backend itself. PrefectResult falls back on its default JSONSerializer, thus leading to the observed behavior?
So we need to serialize the serializers, so they can be deserialized to deserialize the result?
Yep exactly. However, as written right now we can't serialize custom result classes and with the same design we won't be able to serialize custom sterilizers. We've got a rehaul of results on our roadmap so a simpler & more robust experience is coming here eventually.
Since I have access to the child-flow's code after running StartFlowRun I have been able to get the custom serializer from the flow's task.
We pull the serializer from the task itself during flow run execution as well. Sweet task!
To make calling subflows and getting their result easier: I wrapped StartFlowRun
and the above GetFlowResult
tasks in a new Flow Run Task that:
- Creates the
parameters
dict that needs to be passed toStartFlowRun.run
. - Runs flow via
StartFlowRun
and wait for the signal execption. - Parse the result and return a typed result object.
class ExampleFlowRun(prefect.Task):
"""
Run the Example flow and return the result.
Flow needs to be registered on the correct project and name before running.
"""
def __init__(self, project_name: str, **kwargs):
super().__init__(**kwargs)
self.start_flow_run = StartFlowRun(
flow_name="Example-Flow-Name",
project_name=project_name,
wait=True, # Wait for flow to finish
)
self.result_getter = GetFlowResultAsType(
target_task_slug="example-flow-result-task-slug",
result_hydrator=PrefectResult(serializer=PydanticJsonSerializer(ExampleFlowResult)),
result_type=ExampleFlowResult,
)
def run( # type: ignore
self,
value_a: TypeA,
value_b: TypeB
) -> ExampleFlowResult:
parameters = {
"value_a": value_a,
"value_b": value_b,
}
flow_run_signal = self._run_flow_safe(parameters=parameters)
return self.result_getter.run(flow_run_signal)
def _run_flow_safe(self, parameters: Dict[str, Any]) -> PrefectStateSignal:
try:
self.start_flow_run.run(parameters=parameters)
except SUCCESS as flow_run_signal:
return flow_run_signal
except FAIL as exc:
log.exception(exc)
raise FAIL(f"Example Flow Failed Flow Run: {exc!s}") from exc
raise RuntimeError("This should not happen!")
ExampleFlowResult
is a pydantic object here.
This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.
This issue was closed because it has been stale for 14 days with no activity. If this issue is important or you have more to add feel free to re-open it.