faststream icon indicating copy to clipboard operation
faststream copied to clipboard

Bug: Custom Serialization/Deserialization logic in Redis doesn't work unless UTF-8 serializable

Open chrisgoddard opened this issue 10 months ago • 8 comments

Describe the bug I am working on a middleware to add msgpack serialization to all messages (using ormsgpack which natively handles Pydantic models).

The issue seems to be that Faststream JSON-serializes messages along with header here:

# faststream/redis/parser.py
class RawMessage:
# line: 85
    @classmethod
    def encode(
        cls,
        *,
        message: Union[Sequence["SendableMessage"], "SendableMessage"],
        reply_to: Optional[str],
        headers: Optional["AnyDict"],
        correlation_id: str,
    ) -> bytes:
        msg = cls.build(
            message=message,
            reply_to=reply_to,
            headers=headers,
            correlation_id=correlation_id,
        )

        return dump_json(
            {
                "data": msg.data,
                "headers": msg.headers,
            }
        )

Technically msg.data is supposed to be able to be bytes but in practice it has to be utf-8 compatible or it raises an exception:

/.../.venv/lib/python3.12/site-packa │
│ ges/faststream/redis/publisher/producer.py:79 in publish                              │
│                                                                                       │
│    76 │   │   │   psub = self._connection.pubsub()                                    │
│    77 │   │   │   await psub.subscribe(reply_to)                                      │
│    78 │   │                                                                           │
│ ❱  79 │   │   msg = RawMessage.encode(                                                │
│    80 │   │   │   message=message,                                                    │
│    81 │   │   │   reply_to=reply_to,                                                  │
│    82 │   │   │   headers=headers,                                                    │
│                                                                                       │
│ ╭───────────────────────────────────── locals ──────────────────────────────────────╮ │
│ │        channel = None                                                             │ │
│ │ correlation_id = '2ac5886a-736d-4712-b878-448b8b041f43'                           │ │
│ │        headers = None                                                             │ │
│ │           list = 'job-queue'                                                      │ │
│ │         maxlen = None                                                             │ │
│ │        message = b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far'               │ │
│ │           psub = None                                                             │ │
│ │  raise_timeout = False                                                            │ │
│ │       reply_to = ''                                                               │ │
│ │            rpc = False                                                            │ │
│ │    rpc_timeout = 30.0                                                             │ │
│ │           self = <faststream.redis.publisher.producer.RedisFastProducer object at │ │
│ │                  0x1137972f0>                                                     │ │
│ │         stream = None                                                             │ │
│ ╰───────────────────────────────────────────────────────────────────────────────────╯ │
│                                                                                       │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/redis/parser.py:101 in encode                                          │
│                                                                                       │
│    98 │   │   │   correlation_id=correlation_id,                                      │
│    99 │   │   )                                                                       │
│   100 │   │                                                                           │
│ ❱ 101 │   │   return dump_json(                                                       │
│   102 │   │   │   {                                                                   │
│   103 │   │   │   │   "data": msg.data,                                               │
│   104 │   │   │   │   "headers": msg.headers,                                         │
│                                                                                       │
│ ╭────────────────────────────────── locals ───────────────────────────────────╮       │
│ │ correlation_id = '2ac5886a-736d-4712-b878-448b8b041f43'                     │       │
│ │        headers = None                                                       │       │
│ │        message = b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far'         │       │
│ │            msg = <faststream.redis.parser.RawMessage object at 0x10e69c790> │       │
│ │       reply_to = ''                                                         │       │
│ ╰─────────────────────────────────────────────────────────────────────────────╯       │
│                                                                                       │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/_compat.py:93 in dump_json                                             │
│                                                                                       │
│    90 │   │   return to_jsonable_python(model, **kwargs)                              │
│    91 │                                                                               │
│    92 │   def dump_json(data: Any) -> bytes:                                          │
│ ❱  93 │   │   return json_dumps(model_to_jsonable(data))                              │
│    94 │                                                                               │
│    95 │   def get_model_fields(model: Type[BaseModel]) -> Dict[str, Any]:             │
│    96 │   │   return model.model_fields                                               │
│                                                                                       │
│ ╭───────────────────────────────────── locals ─────────────────────────────────────╮  │
│ │ data = {                                                                         │  │
│ │        │   'data': b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far',           │  │
│ │        │   'headers': {                                                          │  │
│ │        │   │   'correlation_id': '2ac5886a-736d-4712-b878-448b8b041f43'          │  │
│ │        │   }                                                                     │  │
│ │        }                                                                         │  │
│ ╰──────────────────────────────────────────────────────────────────────────────────╯  │
│                                                                                       │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/_compat.py:90 in model_to_jsonable                                     │
│                                                                                       │
│    87 │   │   model: BaseModel,                                                       │
│    88 │   │   **kwargs: Any,                                                          │
│    89 │   ) -> Any:                                                                   │
│ ❱  90 │   │   return to_jsonable_python(model, **kwargs)                              │
│    91 │                                                                               │
│    92 │   def dump_json(data: Any) -> bytes:                                          │
│    93 │   │   return json_dumps(model_to_jsonable(data))                              │
│                                                                                       │
│ ╭───────────────────────────────────── locals ──────────────────────────────────────╮ │
│ │ kwargs = {}                                                                       │ │
│ │  model = {                                                                        │ │
│ │          │   'data': b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far',          │ │
│ │          │   'headers': {                                                         │ │
│ │          │   │   'correlation_id': '2ac5886a-736d-4712-b878-448b8b041f43'         │ │
│ │          │   }                                                                    │ │
│ │          }                                                                        │ │
│ ╰───────────────────────────────────────────────────────────────────────────────────╯ │
╰───────────────────────────────────────────────────────────────────────────────────────╯
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x82 in position 0: invalid utf-8

Here's some test code:


@app.after_startup
async def startup():
    faker = Faker("en_US")

    urls = [faker.url() for _ in range(10)]

    for url in tqdm.tqdm(urls):
        obj = Request(
            url=url,
            type=faker.word(),
        )
        await broker.publish(
            ormsgpack.packb(obj, option=ormsgpack.OPT_SERIALIZE_PYDANTIC),
            list="job-queue",
        )

The error is occurring on the producer side and the only workaround I've found so far is doing a runtime monkey-patch of the RawMessage.encode method so do the msgpack serialization at the final messaging encoding phase. This complicates things on the parsing side though as it breaks the normal message parsing (i.e. in order to get message headers, correlation_id, etc)

Any suggestions? Perhaps there needs to be additional Middleware hooks for handling the final message serialization and initial message deserialization so that serialization methods that utilize non utf-8 compatible binary are supported?

The only other option I can think of is base64 encoding the binary before message serialization which would kind of defeat the space-saving purpose of using a binary format.

Related: https://github.com/airtai/faststream/issues/1255

chrisgoddard avatar Feb 05 '25 15:02 chrisgoddard

You can pass default redis-py option right to the broker constructor to control this behavior: https://github.com/airtai/faststream/blob/main/faststream/redis/broker/broker.py#L113-L117

Lancetnik avatar Feb 05 '25 16:02 Lancetnik

Sorry, I got the problem just now. Here is a full MRE

from msgpack import packb

from faststream import FastStream, Logger
from faststream.redis import RedisBroker

broker = RedisBroker()
app = FastStream(broker)

@broker.subscriber("tests")
async def handler(data, logger: Logger):
    logger.info(f"Received data: {data}")

@app.after_startup
async def start():
    await broker.publish(
        packb({"id": "12345678" * 4, "date": "2021-01-01T00:00:00Z"}),
        "tests",
    )

Lancetnik avatar Feb 22 '25 14:02 Lancetnik

I think, we should reinvent our own message format to publish

Lancetnik avatar Feb 22 '25 14:02 Lancetnik

Is the issue that you have to wrap whatever the client message is so that you get the necessary metadata you need on the receiving end?

chrisgoddard avatar Feb 24 '25 21:02 chrisgoddard

Is the issue that you have to wrap whatever the client message is so that you get the necessary metadata you need on the receiving end?

Yeah, we have to use headers. Therefore we made our own message format, but I designed it bad and you faced with a serialization problem. I think, we should redisign RowMessage - encode and parse methods to be able serialize any data.

Lancetnik avatar Feb 25 '25 16:02 Lancetnik

I might be missing something. It looks like swapping json with msgpack is just enough to resolve the problem.

Yakov-Varnaev avatar Feb 27 '25 15:02 Yakov-Varnaev

I might be missing something. It looks like swapping json with msgpack is just enough to resolve the problem.

It's not an our dependency, so we couldn't use it there

Lancetnik avatar Feb 27 '25 20:02 Lancetnik

Encountered this while using protobuf so +1 on this too.

clippered avatar Mar 19 '25 01:03 clippered

Hi @Lancetnik. I've encountered the same problem while using msgpack with faststream and redis. Do you have any ideas about how to redesign RawMessage class? I might help with implementation. My suggestion is to use python's built-in struct.pack and struct.unpack functions to deal with binary data

ilya-4real avatar Jun 09 '25 13:06 ilya-4real