taskiq-pipelines
taskiq-pipelines copied to clipboard
Support non-json serializers
Hey! Thank you for the library.
For some reason, I would use CBOR serializer instead of JSON.
The system could be configured easily enough by overloading Pipeline
and AbstractStep
dumps()
and loads()
methods. It works pretty well at least with aio-pika broker thanks to the fact that pika headers could be not only strings but also bytes.
The solution looks a little cumbersome though, it requires overloading more methods than it should be.
As a proper and generic resolution of the problem, I could imagine adding a new abstraction: AsyncBroker.serializer. It should be an instance of the following class:
class TaskiqSerializer(ABC):
@abstractmethod
def dumps(self, value: Any) -> bytes: ...
@abstractmethod
def loads(self, value: bytes) -> Any: ...
The difference between TaskiqSerializer
and TaskiqFormatter
is that TaskiqFormatter
works with TaskiqMessage
only but serializer should operate with any primitive types that are supported by underlying serialization system (things like decimals, datetimes, etc). Serializer supersedes formatter ABC and could be used instead with deprecation of TaskiqFormatter
class and AsyncBroker.formatter
attribute.
Another subtle problem is the type of TaskiqMessage.headers
and BrokerMessage.headers
. I don't think it is the showstopper; AsyncTaskiqDecoratedTask.labels
is Dict[str, Any]
already which is the correct type I believe.
What do you think? If you agree with my thoughts I can try to create PR for taskiq itself first and modify taskiq-pipelines
later.
I'm very glad you liked the project, and the issue seems to be very reasonable to me.
Also, we can benefit from it by making support for custom types easier and removing these checks from the kicker: taskiq-python/taskiq@34db231/taskiq/kicker.py#L156
Regarding headers. Header copying happens only in the RabbitMQ. Not all brokers support headers for messages. The main reason we made it this way is because We wanted to give people control over how they can send messages.
I assume that the most elegant solution is to separate message headers and task labels. That will also reduce the number of bytes sent to the broker. Since, as for now, we have duplicate data in messages and headers. Which isn't good, I presume.
Regarding headers/tasks duplication -- I suggest leaving it for a separate improvement.
Speaking about a serializer, I think that the serializer should work with primitive terminal types only and know nothing about dataclasses or pydantic models. Datetime (even timezone aware) is still a primitive type, dataclass is not. Serializer has knowledge about json structures. Also, it can know how to work with some additional types if they are supported by underlying binary format. For example, msgpack and cbor both support tagged types. cbor has standard tags for decimals, dates, and even URIs.
But I really don't think that the serializer should do something with pydantic models or dataclasses.
Okay. Now I cannot see how we can use serializers. For example, currently you can create a cbor formatter to make all your tasks serialized using this format.
class CBORFormatter(TaskiqFormatter):
def dumps(self, data: TaskiqMessage) -> BrokerMessage:
return BrokerMessage(
task_id=data.task_id,
task_name=data.task_name,
labels=data.labels,
message=cbor2.dumps(data.model_dump()),
)
def loads(self, data: bytes) -> TaskiqMessage:
return TaskiqMessage.model_validate(**cbor2.loads(data))
If you subscribe to a topic and send a message, you will see that the mesasge is properly formatted.
If you want to serialize the pipeline data using cbor, maybe it'd be easier to add a parameter to the PipelineMiddleware
that is capable of serializing and deserializing pipeline data.
But I still want to understand when new abstraction might be used and how. Since it has a priority over a formatter and operates over values rather than a whole message, I don't see where to call serialize and deserialize.