dramatiq icon indicating copy to clipboard operation
dramatiq copied to clipboard

Pydantic serialization

Open jonashaag opened this issue 1 year ago • 3 comments

I'm tinkering with an automatic Pydantic (de)serialization integration. Here is what I have so far:

import dataclasses
from typing import Any

import dramatiq
from dramatiq.results import Results
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, validate_call


def pydantic_convert(data, type_annotation):
    """Use Pydantic to convert `data` to the type specified by `type_annotation`."""
    return pydantic_task.create_model("", x=(type_annotation, ...))(x=data).x  # type: ignore


@dataclasses.dataclass(frozen=True)
class PydanticMessage[R](dramatiq.Message[R]):
    """Use Pydantic to convert the task result to the actor's return annotation type."""

    def get_result(self, *args, **kwargs) -> R:
        res = super().get_result(*args, **kwargs)
        actor_fn = broker.get_actor(self.actor_name).fn
        return pydantic_convert(res, actor_fn.__annotations__["return"])


class PydanticActor[R](dramatiq.Actor[..., R]):
    def message_with_options(self, *, args: tuple = (), kwargs: dict[str, Any] | None = None, **options) -> PydanticMessage[R]:
        message = super().message_with_options(args=jsonable_encoder(args), kwargs=jsonable_encoder(kwargs), **options)
        return PydanticMessage(**message.asdict())


def pydantic_task():
    def decorator(func):
        validated_func = validate_call(validate_return=True)(func)
        return dramatiq.actor(actor_class=PydanticActor)(validated_func)

    return decorator


##########


class X(BaseModel):
    x: int


@pydantic_task()
def example(x: X) -> X:
    return X(x=x.x + 1)
  • It needs access to the broker instance to fetch the actor function's return type
  • It needs a custom Message class to make get_result convert the result to the actor function's return type

jonashaag avatar Oct 28 '24 13:10 jonashaag

We are using this snippet:

import json
from functools import partial
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import TypeVar
from typing import cast

from dramatiq.message import Message

from dramatiq.actor import Actor
from dramatiq.actor import actor
from pydantic import BaseModel
from pydantic import validate_arguments
from pydantic.decorator import V_DUPLICATE_KWARGS
from pydantic.decorator import V_POSITIONAL_ONLY_NAME

R = TypeVar('R')


class PydanticActor(Actor):
    def __init__(self, fn: Callable[..., R], **kwargs):
        super().__init__(validate_arguments(fn), **kwargs)

    def message_with_options(
        self,
        *,
        args: tuple = (),
        kwargs: Optional[Dict[str, Any]] = None,
        **options,
    ) -> Message[R]:
        assert not args, 'positional arguments are not allowed in pydantic model'
        instance = cast(BaseModel, self.fn.model(*args, **kwargs))
        # exclude extra fields created by `validate_arguments`
        exclude = {
            V_POSITIONAL_ONLY_NAME,
            V_DUPLICATE_KWARGS,
            self.fn.vd.v_args_name,
            self.fn.vd.v_kwargs_name,
        }
        json_kwargs = json.loads(instance.json(exclude=exclude))

        return super().message_with_options(args=args, kwargs=json_kwargs, **options)

    def send(self, /, **kwargs) -> Message[R]:
        # positional arguments are not allowed in pydantic model
        return super().send(**kwargs)


pydantic_actor = partial(actor, actor_class=PydanticActor)

But this actor has one limitation: it can't be used with asyncio due validate_arguments is sync. I think it should be replaced by validate_call, cause it support asyncio

spumer avatar Oct 28 '24 14:10 spumer

It also does not support Pydantic/type checked return values right?

jonashaag avatar Oct 28 '24 14:10 jonashaag

Yes. We do not need it because do not use result backend

spumer avatar Oct 28 '24 17:10 spumer

@synweap15 it's completed? Looks like "not planned"?

Image

spumer avatar Aug 27 '25 10:08 spumer

@spumer indeed not planned. Thanks.

synweap15 avatar Aug 27 '25 10:08 synweap15

Not planned accept PR too?

spumer avatar Aug 28 '25 17:08 spumer