taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

Exception Serialization

Open lryan599 opened this issue 7 months ago • 4 comments

I've found that if a broker sends a function that raises an exception initialized by kwargs, the serialization of that exception will lose those kwargs.

This causes the function exception_to_python() to not properly deserialize the exception.

from fastapi HTTPException

@broker.task
async def long_running_task(value):
    print(f"[{datetime.now()}] Starting long running task {value}...")
    await asyncio.sleep(5)
    raise HTTPException(400, "Task failed")  # ok
    # raise  HTTPException(status_code=400, detail="Task failed")  # it does not work for deserialization

When perform exception_to_python(), it will

    try:
        exception = cls(*exc_msg)
    except Exception:
        # goes here, because `*exc_msg` can not init HTTPException class since it is empty
        exception = Exception(f"{cls}({exc_msg})")

Version list:

  • taskiq: 0.11.16
  • python: 3.9
  • pydantic: v2

Is there any way to deserialize exception with kwargs properly?

lryan599 avatar Apr 27 '25 07:04 lryan599

Hi, not sure if this helps, but I had a similar problem and ended up creating a wrapper function around Taskiq’s model_dump in my result backend.

The function, in a rather naive way, manually adds the exception arguments into the serialized data if it encounters a particular class of exception (the one that was causing issues).

mromanelli9 avatar Jul 10 '25 15:07 mromanelli9

Hi, not sure if this helps, but I had a similar problem and ended up creating a wrapper function around Taskiq’s model_dump in my result backend.

The function, in a rather naive way, manually adds the exception arguments into the serialized data if it encounters a particular class of exception (the one that was causing issues).

Thanks for your reply! Is there any codes I can refer?

lryan599 avatar Jul 10 '25 15:07 lryan599

Thanks for your reply! Is there any codes I can refer?

As I mentioned, it's a very naive approach: it simply overrides the serialized fields of the exception manually.

from typing import Any, TypeVar
from taskiq import TaskiqResult
from taskiq.compat import model_dump
from app.exceptions import RestException # <- the exception at issue

_ReturnType = TypeVar("_ReturnType")

def result_dump(result: TaskiqResult) -> dict[str, Any]:
    """Wrapper around Taskiq's `model_dump` with improved handling of `RestException`."""
    dump: dict[str, Any] = model_dump(result)
    if result.error is None or not isinstance(result.error, RestException):
        return dump

    error: RestException = result.error
    dump["error"]["exc_message"] = [error.detail, error.src, error.status, error.code, error.title]

    return dump

class CustomRedisAsyncResultBackend(RedisAsyncResultBackend[_ReturnType]):
   ....
   async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> None:
        redis_set_params: dict[str, str | int | bytes] = {
            "name": self._task_name(task_id),
            "value": self.serializer.dumpb(result_dump(result)),
        }
        ...

I'm open to better solutions if anyone has suggestions.

mromanelli9 avatar Jul 11 '25 06:07 mromanelli9

I solved this by implementing the __setstate__ and __reduce__ methods in my custom exception class that inherits from FastAPI’s HTTPException:

from fastapi import HTTPException


class MyCustomException(HTTPException):
    detail: str
    status_code: int

    def __setstate__(self, state) -> None:
        for k, v in state.items():
            setattr(self, k, v)

    def __reduce__(self) -> tuple:
        return (self.__class__.__new__, (self.__class__,), self.__getstate__())

I think that the issue was happening because pickle was throwing the exception when unpickling the exception instead of just returning the instance, but i'm not sure.

joaovitorsilvestre avatar Oct 27 '25 20:10 joaovitorsilvestre