celery
celery copied to clipboard
Adding support for pydantic model serialization
Checklist
- [x] I have checked the issues list for similar or identical feature requests.
- [x] I have checked the pull requests list for existing proposed implementations of this feature.
- [x] I have checked the commit log to find out if the same feature was already implemented in the main branch.
- [x] I have included all related issues and possible duplicate issues in this issue (If there are none, check this box anyway).
Related Issues and Possible Duplicates
Related Issues
- None
Possible Duplicates
- None
Brief Summary
Hello, since Pydantic is a growing data validation util within the Python community and has become the de-facto solution in some popular web frameworks, ilke FastAPI, I wonder why not allowing support to pass Pydantic models as task arguments. Since it supports json serialization out of the box, I guess that it should be fairly doable.
Design
Architectural Considerations
Please note that I haven't gone through the code base extensively, so I don't know how to best implement this, so this is just an idea:
Adding a custom serializer that is a wrapper around Pydantic's base model methods. model_dump_json() (in V2) or .json() (in V1).
One could add something like
# pydantic
class PydanticEncoder(json.JSONEncoder):
....
def decode():
...
def encode():
...
and register it in kombu
from kombu.serialization import register
from .pydantic import encode, decode
register('pydantic-json', encode, decode,
content_type='application/json',
content_encoding='utf-8')
Proposed Behavior
Given a pydantic model like
from pydantic import BaseModel
class TaskParams(BaseModel):
foo: str
bar: int
and a celery task that is defined like:
@shared_task
def my_task(foo: str, bar: int)
...
writing this like:
@shared_task
def my_task(params: TaskParams)
...
should be passed to the broker correctly serialized as json out of the box by celery
Proposed UI/UX
None.
When installing, this could be installed as an option
pip install celery[pydantic]
Diagrams
N/A
Alternatives
I haven't personally tried it out, but I guess there should be 2 options:
- Registering pydantic in kombu's json serializer, just like in the docs https://docs.celeryq.dev/projects/kombu/en/latest/userguide/serialization.html
Would love this feature tbh. I've been working with FastAPI+Pydantic+SQLAlchemy for a few years now and, especially with recent developments in Pydantic V2, it's now so nice to use it for serialisation/deserialisation.
My current approach, since this isn't yet here, so to do my_model.model_dump_json() on one end, and Model.model_validate_json(json_blob) at the other end. Any nitty gritty of app-specific serialisation is defined directly in the Model Pydantic class and automatically syncs up between the Celery Worker and my API which is issuing the tasks.
Also, Pydantic has its own JSON parser integrated, and it handles types in a very specific and predefined ways (e.g. datetimes), so I prefer to use model_dump_json() which outputs a string, rather than model_dump(mode="json") which outputs a dict and then let Celery/Kombu do the serialisation down to JSON. I think a kombu serialiser should follow a similar approach.
I got a rudimentary solution to this working: https://gist.github.com/badge/926165ff21ce438c8ddb58aba87021e0
I tried out @badge's solution and it inspired me to make my own solution to work for my use case.
I specialized it to work with classes and generic inheritance. I think it could be made to work with decorators using the base argument. Here is a working example: https://gist.github.com/martinkozle/37c7eef95f9bbb5ace8bc6e32f379673
It works great with the celery-types type stubs and strict mypy type checking.
If Pydantic support isn't planned in Celery then I can make my BasePydanticTask implementation into a separate package if there is interest.
@martinkozle can you please create a pull request for celery so we can upvote it?
@martinkozle can you please create a pull request for celery so we can upvote it?
I do not believe that the current implementation that I provided is suitable for upstream celery as is. It was made for my use case and codebase. It has some questionable design decisions by me, and some edge cases:
- It uses typing and generics for the TypeAdapter type, although this can be swapped for class level variables instead.
- It creates side effects by registering different kombu serializers for every task. I don't quite like this approach but didn't find a better way, and maybe there isn't a better way
¯\_(ツ)_/¯. - I designed it to be mainly used with class style tasks and inheritance, while the docs everywhere suggest to use decorators. So the definition of the TypeAdapter type needs to be done in a different way than the Generic, which currently requires explicit inheritance. Also idk if other than that it would be compatible with the decorators.
- Doesn't work if you want to pass an argument which is an Union of Pydantic models with the same fields, then the same json would map to multiple valid Pydantic models. Although this would just be a bad model design when working with Pydantic and serialization in general.
If anyone has any suggestions on how to better adapt this solution to fit inside celery, then I would be happy to try to implement it. In the meantime I will explore some other possibilities.
Any additional discussion on this? I woud love to have this.
Also consider this: https://benninger.ca/posts/celery-serializer-pydantic/ (an approach we're currently using). (not our code, just thought it should be mentioned here)
We'd love to see a PR that implements this feature. If anyone is up for the task please create one and ask @Nusnus for a review.
Also consider this: https://benninger.ca/posts/celery-serializer-pydantic/ (an approach we're currently using). (not our code, just thought it should be mentioned here)
I had a look at this. It's a clever approach and I'm sure it works in most scenarios, but it relies on the caller defining the class (any class that implements a parse_obj() function, really) that the data will be loaded with. That just seems somehow "wrong" to me.
At the very least, the task still has to verify that this is the correct model, but this could theoretically even be a security issue, as the caller can trick the Celery worker into running different code, right?
Looking at this more closely today, I believe this is not really possible just with Celery Serializers as they are now. They do receive just the object, but not the task they deserialize this for.
I try a different approach: Create a decorator that wraps shared_task(). Here's a simple proof of concept:
https://gist.github.com/mathiasertl/40432b4d653085441c20affd8a8764e6
For the logs, I get:
[2024-05-06 21:40:50,005: INFO/MainProcess] Task django_ca.tasks.test_task[e109b0b9-dcb3-40c8-b4fe-4f5ba479aa8a] received
[INFO 2024-05-06 21:40:50] Found typehints: {'data': <class 'django_ca.tasks.TestModel'>, 'return': <class 'django_ca.tasks.TestReturnModel'>}
[INFO 2024-05-06 21:40:50] Validated data: TestModel(value=3)
[INFO 2024-05-06 21:40:50] Task received value: TestModel(value=3)
[INFO 2024-05-06 21:40:50] Wrapper dumps model: TestReturnModel(value='returned value')
[2024-05-06 21:40:50,007: INFO/ForkPoolWorker-16] Task django_ca.tasks.test_task[e109b0b9-dcb3-40c8-b4fe-4f5ba479aa8a] succeeded in 0.0012026669992337702s: {'
value': 'returned value'}
... this certainly needs refinement. But I'd be willing to create a PR if a maintainer (e.g. @Nusnus) would consider this approach.
I think that would be a really useful feature. @mathiasertl the combination of the two approaches would be great: Celery serializer should just .model_dump() all pydantic models and the wrapper should based on the typehint execute a model_validate for pydantic models.
This would indeed be a very interesting feature
I don't think we want to use "duck typing" and just blindly call .model_dump() and .model_validate(). This is because then we won't have support for types like list[Model], tuple[Model, Model], dict[str, Model], etc.
For serialization you can just use from pydantic_core import to_json, which works with any combination of nested Python data structures and Pydantic models. For de-serialization I really like the idea of using type hinting in order to get the types, and then you can construct a TypeAdapter for each argument.
As you can see, I made a PR in #9023 to attempt to implement this. Comments (and tests) are very welcome!
I have a quick solution.
- Define a helper function to register pydantic models:
from typing import Type, Dict
from functools import partial
from kombu.utils.json import register_type
from pydantic import BaseModel
def class_full_name(clz: Type[BaseModel]) -> str:
return ".".join([clz.__module__, clz.__qualname__])
def _encoder(obj: BaseModel, *args, **kwargs) -> Dict:
return obj.model_dump(*args, **kwargs)
def _decoder(clz: Type[BaseModel], data: Dict) -> BaseModel:
return clz(**data)
def register_pydantic_types(*models: Type[BaseModel]):
for model in models:
register_type(
model,
class_full_name(model),
encoder=_encoder,
decoder=partial(_decoder, model),
)
- Call this helper with all pydantic models:
app = Celery('my_app')
# ...
register_pydantic_types(TestModel, TestModel2)
Now you can use TestModel, TestModel2 in any celery tasks.
Very cool solution, I cannot believe that register_type has existed for over a year and no one has thought to use it for Pydantic.
A question that I have is does this use the typing of the argument and the class name marker when deciding which decoder to use? Or does it just try all of the registered types in order till one works? For example, does it handle if there are 2 Pydantic models with the exact same fields but that have different methods? This might sometimes happen if subtyping functionality for example. If a task has 2 different arguments that are typed with such models will it differentiate which decoder to use? This is an issue that I had with my solution, but I used a single registered serializer with a TypeAdapter for all args.
def _decoder(clz: Type[BaseModel], data: Dict) -> BaseModel: return clz(**data)
Also, for the _decoder I would suggest rather using clz.model_validate(data).
The problem with this approach is that it does not really consider the type hint in the task for decoding. It actually encodes the model with the registered encoder and also uses the corresponding decoder then. So it allows indeed passing pydantic models but it does not guarantee that the model you are sending is the one the task expects. To obtain this, you would probably need to define a decorator which validates the input arguments.
I actually now made it work by using @zeroohub solution together with defining a BaseTask
from inspect import signature
from celery import Task, Celery
class BaseTask(Task):
def __call__(self, *args, **kwargs):
sig = signature(self.__wrapped__)
bound_args = sig.bind(*args, **kwargs)
for arg_name, arg_value in bound_args.arguments.items():
if arg_name in self.__wrapped__.__annotations__:
typehint = self.__wrapped__.__annotations__[arg_name]
if not isinstance(arg_value, typehint):
raise ValueError(f"Argument {arg_name} is not of type {typehint}")
result = super(BaseTask, self).__call__(*args, **kwargs)
return result
def on_success(self, retval, task_id, args, kwargs):
super(BaseTask, self).on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
super(BaseTask, self).on_failure(exc, task_id, args, kwargs, einfo)
app = Celery(...)
app.Task = BaseTask
Thank you @mathiasertl for implementing the support in #9023. It will be added to the new features of Celery v5.5.0 (currently in Beta).
I'm very keen to try out this feature, and checked the document for an example. this is going to be a popular feature in the beta!
is this in a public beta yet? It appears to be merged
I'm using celery = "^5.5.0b2"
FYI I also did po add git+https://github.com/celery/celery.git
which gives:
- Installing celery (5.5.0b2 a187891)
but I'm getting this error:
kombu.exceptions.EncodeError: Object of type QBotMessage is not JSON serializable
I have a task defined like this:
@qb_server.task(pydantic=True)
def ping_msg_task(msg: QBotMessage) -> QBotMessage:
assert isinstance(msg, QBotMessage)
clogger.info('pinged with: =>', msg)
msg.query = 'pong'
return msg
and the model is:
class QBotMessage(BaseModel):
query: Optional[str] = ""
subqueries: Optional[list[str]] = []
ok: Optional[bool] = False
eval: Optional[str] = ""
I'm using RabbitMQ as the broker and rpc backend.
qb_server = Celery(
'voyager.services.qbot',
broker=env_get('CELERY_BROKER_URL'),
backend=env_get('CELERY_BACKEND_URL'),
include=['voyager.services.qbot.tasks.tasklist'],
serializer='json',
result_expires=3600,
)
## json is default but in case...
qb_server.conf.update(
task_serializer="json",
result_serializer="json",
event_serializer="json",
accept_content=["application/json"],
result_accept_content=["application/json"],
)
I believe JSON is the default but just in case I also tried with the serializer param on the task decorator:
@qb_server.task(pydantic=True, serializer='json')
are there certain restrictions for example Optional fields are not supported?
I also checked the model instance dumps fine with blob = msg.model_dump_json()
I can paste the full trace but don't want to spam this issue too much.
File "/Users/dc/dev/proj/tealbook/voyager/.venv/lib/python3.12/site-packages/kombu/serialization.py", line 220, in dumps
payload = encoder(data)
^^^^^^^^^^^^^
File "/Users/dc/dev/proj/tealbook/voyager/.venv/lib/python3.12/site-packages/kombu/utils/json.py", line 63, in dumps
return _dumps(s, cls=cls, **dict(default_kwargs, **kwargs))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.12.3/Frameworks/Python.framework/Versions/3.12/lib/python3.12/json/__init__.py", line 238, in dumps
**kw).encode(obj)
^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.12.3/Frameworks/Python.framework/Versions/3.12/lib/python3.12/json/encoder.py", line 200, in encode
chunks = self.iterencode(o, _one_shot=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.12.3/Frameworks/Python.framework/Versions/3.12/lib/python3.12/json/encoder.py", line 258, in iterencode
return _iterencode(o, 0)
^^^^^^^^^^^^^^^^^
File "/Users/dc/dev/proj/tealbook/voyager/.venv/lib/python3.12/site-packages/kombu/utils/json.py", line 47, in default
return super().default(o)
^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/[email protected]/3.12.3/Frameworks/Python.framework/Versions/3.12/lib/python3.12/json/encoder.py", line 180, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
kombu.exceptions.EncodeError: Object of type QBotMessage is not JSON serializable
error: Recipe `qbot-client` failed on line 250 with exit code 1
googling also found me this https://benninger.ca/posts/celery-serializer-pydantic/
@mathiasertl Can you take a look at the problem @dcsan is having?
Hi @dcsan !
I'm replying while waiting for breakfast on vacation. Excuse my brievety, I'll also have more time starting Sunday.
I'm not sure if you're using the feature right in the first place. The decorator converts parameters and return values on the Task side, e.g. in the celery worker about to execute the task. It hooks in after the message has been received and loaded.
It does not automatically serialize Pydantic models on the sender side (e.g. when calling .delay()). It sounds to me like this is what you're doing?
If yes, that explains it, but is also a strong hint that I need to make documentation more clear.
KR, mat
indeed I assumed it was working in both directions. so for sending data into the system I still need to serialize it myself? got it. thanks for the help!
Yes exactly. Please let me know if there's any further issues!
I'll make a small pull request clarifying documentation on the weekend.