Pydantic serialization
I'm tinkering with an automatic Pydantic (de)serialization integration. Here is what I have so far:
import dataclasses
from typing import Any
import dramatiq
from dramatiq.results import Results
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, validate_call
def pydantic_convert(data, type_annotation):
"""Use Pydantic to convert `data` to the type specified by `type_annotation`."""
return pydantic_task.create_model("", x=(type_annotation, ...))(x=data).x # type: ignore
@dataclasses.dataclass(frozen=True)
class PydanticMessage[R](dramatiq.Message[R]):
"""Use Pydantic to convert the task result to the actor's return annotation type."""
def get_result(self, *args, **kwargs) -> R:
res = super().get_result(*args, **kwargs)
actor_fn = broker.get_actor(self.actor_name).fn
return pydantic_convert(res, actor_fn.__annotations__["return"])
class PydanticActor[R](dramatiq.Actor[..., R]):
def message_with_options(self, *, args: tuple = (), kwargs: dict[str, Any] | None = None, **options) -> PydanticMessage[R]:
message = super().message_with_options(args=jsonable_encoder(args), kwargs=jsonable_encoder(kwargs), **options)
return PydanticMessage(**message.asdict())
def pydantic_task():
def decorator(func):
validated_func = validate_call(validate_return=True)(func)
return dramatiq.actor(actor_class=PydanticActor)(validated_func)
return decorator
##########
class X(BaseModel):
x: int
@pydantic_task()
def example(x: X) -> X:
return X(x=x.x + 1)
- It needs access to the
brokerinstance to fetch the actor function's return type - It needs a custom
Messageclass to makeget_resultconvert the result to the actor function's return type
We are using this snippet:
import json
from functools import partial
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import TypeVar
from typing import cast
from dramatiq.message import Message
from dramatiq.actor import Actor
from dramatiq.actor import actor
from pydantic import BaseModel
from pydantic import validate_arguments
from pydantic.decorator import V_DUPLICATE_KWARGS
from pydantic.decorator import V_POSITIONAL_ONLY_NAME
R = TypeVar('R')
class PydanticActor(Actor):
def __init__(self, fn: Callable[..., R], **kwargs):
super().__init__(validate_arguments(fn), **kwargs)
def message_with_options(
self,
*,
args: tuple = (),
kwargs: Optional[Dict[str, Any]] = None,
**options,
) -> Message[R]:
assert not args, 'positional arguments are not allowed in pydantic model'
instance = cast(BaseModel, self.fn.model(*args, **kwargs))
# exclude extra fields created by `validate_arguments`
exclude = {
V_POSITIONAL_ONLY_NAME,
V_DUPLICATE_KWARGS,
self.fn.vd.v_args_name,
self.fn.vd.v_kwargs_name,
}
json_kwargs = json.loads(instance.json(exclude=exclude))
return super().message_with_options(args=args, kwargs=json_kwargs, **options)
def send(self, /, **kwargs) -> Message[R]:
# positional arguments are not allowed in pydantic model
return super().send(**kwargs)
pydantic_actor = partial(actor, actor_class=PydanticActor)
But this actor has one limitation: it can't be used with asyncio due validate_arguments is sync. I think it should be replaced by validate_call, cause it support asyncio
It also does not support Pydantic/type checked return values right?
Yes. We do not need it because do not use result backend
@synweap15 it's completed? Looks like "not planned"?
@spumer indeed not planned. Thanks.
Not planned accept PR too?