taskiq
taskiq copied to clipboard
Exception Serialization
I've found that if a broker sends a function that raises an exception initialized by kwargs, the serialization of that exception will lose those kwargs.
This causes the function exception_to_python() to not properly deserialize the exception.
from fastapi HTTPException
@broker.task
async def long_running_task(value):
print(f"[{datetime.now()}] Starting long running task {value}...")
await asyncio.sleep(5)
raise HTTPException(400, "Task failed") # ok
# raise HTTPException(status_code=400, detail="Task failed") # it does not work for deserialization
When perform exception_to_python(), it will
try:
exception = cls(*exc_msg)
except Exception:
# goes here, because `*exc_msg` can not init HTTPException class since it is empty
exception = Exception(f"{cls}({exc_msg})")
Version list:
- taskiq: 0.11.16
- python: 3.9
- pydantic: v2
Is there any way to deserialize exception with kwargs properly?
Hi, not sure if this helps, but I had a similar problem and ended up creating a wrapper function around Taskiq’s model_dump in my result backend.
The function, in a rather naive way, manually adds the exception arguments into the serialized data if it encounters a particular class of exception (the one that was causing issues).
Hi, not sure if this helps, but I had a similar problem and ended up creating a wrapper function around Taskiq’s
model_dumpin my result backend.The function, in a rather naive way, manually adds the exception arguments into the serialized data if it encounters a particular class of exception (the one that was causing issues).
Thanks for your reply! Is there any codes I can refer?
Thanks for your reply! Is there any codes I can refer?
As I mentioned, it's a very naive approach: it simply overrides the serialized fields of the exception manually.
from typing import Any, TypeVar
from taskiq import TaskiqResult
from taskiq.compat import model_dump
from app.exceptions import RestException # <- the exception at issue
_ReturnType = TypeVar("_ReturnType")
def result_dump(result: TaskiqResult) -> dict[str, Any]:
"""Wrapper around Taskiq's `model_dump` with improved handling of `RestException`."""
dump: dict[str, Any] = model_dump(result)
if result.error is None or not isinstance(result.error, RestException):
return dump
error: RestException = result.error
dump["error"]["exc_message"] = [error.detail, error.src, error.status, error.code, error.title]
return dump
class CustomRedisAsyncResultBackend(RedisAsyncResultBackend[_ReturnType]):
....
async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> None:
redis_set_params: dict[str, str | int | bytes] = {
"name": self._task_name(task_id),
"value": self.serializer.dumpb(result_dump(result)),
}
...
I'm open to better solutions if anyone has suggestions.
I solved this by implementing the __setstate__ and __reduce__ methods in my custom exception class that inherits from FastAPI’s HTTPException:
from fastapi import HTTPException
class MyCustomException(HTTPException):
detail: str
status_code: int
def __setstate__(self, state) -> None:
for k, v in state.items():
setattr(self, k, v)
def __reduce__(self) -> tuple:
return (self.__class__.__new__, (self.__class__,), self.__getstate__())
I think that the issue was happening because pickle was throwing the exception when unpickling the exception instead of just returning the instance, but i'm not sure.