faststream
faststream copied to clipboard
Bug: Custom Serialization/Deserialization logic in Redis doesn't work unless UTF-8 serializable
Describe the bug I am working on a middleware to add msgpack serialization to all messages (using ormsgpack which natively handles Pydantic models).
The issue seems to be that Faststream JSON-serializes messages along with header here:
# faststream/redis/parser.py
class RawMessage:
# line: 85
@classmethod
def encode(
cls,
*,
message: Union[Sequence["SendableMessage"], "SendableMessage"],
reply_to: Optional[str],
headers: Optional["AnyDict"],
correlation_id: str,
) -> bytes:
msg = cls.build(
message=message,
reply_to=reply_to,
headers=headers,
correlation_id=correlation_id,
)
return dump_json(
{
"data": msg.data,
"headers": msg.headers,
}
)
Technically msg.data is supposed to be able to be bytes but in practice it has to be utf-8 compatible or it raises an exception:
/.../.venv/lib/python3.12/site-packa │
│ ges/faststream/redis/publisher/producer.py:79 in publish │
│ │
│ 76 │ │ │ psub = self._connection.pubsub() │
│ 77 │ │ │ await psub.subscribe(reply_to) │
│ 78 │ │ │
│ ❱ 79 │ │ msg = RawMessage.encode( │
│ 80 │ │ │ message=message, │
│ 81 │ │ │ reply_to=reply_to, │
│ 82 │ │ │ headers=headers, │
│ │
│ ╭───────────────────────────────────── locals ──────────────────────────────────────╮ │
│ │ channel = None │ │
│ │ correlation_id = '2ac5886a-736d-4712-b878-448b8b041f43' │ │
│ │ headers = None │ │
│ │ list = 'job-queue' │ │
│ │ maxlen = None │ │
│ │ message = b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far' │ │
│ │ psub = None │ │
│ │ raise_timeout = False │ │
│ │ reply_to = '' │ │
│ │ rpc = False │ │
│ │ rpc_timeout = 30.0 │ │
│ │ self = <faststream.redis.publisher.producer.RedisFastProducer object at │ │
│ │ 0x1137972f0> │ │
│ │ stream = None │ │
│ ╰───────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/redis/parser.py:101 in encode │
│ │
│ 98 │ │ │ correlation_id=correlation_id, │
│ 99 │ │ ) │
│ 100 │ │ │
│ ❱ 101 │ │ return dump_json( │
│ 102 │ │ │ { │
│ 103 │ │ │ │ "data": msg.data, │
│ 104 │ │ │ │ "headers": msg.headers, │
│ │
│ ╭────────────────────────────────── locals ───────────────────────────────────╮ │
│ │ correlation_id = '2ac5886a-736d-4712-b878-448b8b041f43' │ │
│ │ headers = None │ │
│ │ message = b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far' │ │
│ │ msg = <faststream.redis.parser.RawMessage object at 0x10e69c790> │ │
│ │ reply_to = '' │ │
│ ╰─────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/_compat.py:93 in dump_json │
│ │
│ 90 │ │ return to_jsonable_python(model, **kwargs) │
│ 91 │ │
│ 92 │ def dump_json(data: Any) -> bytes: │
│ ❱ 93 │ │ return json_dumps(model_to_jsonable(data)) │
│ 94 │ │
│ 95 │ def get_model_fields(model: Type[BaseModel]) -> Dict[str, Any]: │
│ 96 │ │ return model.model_fields │
│ │
│ ╭───────────────────────────────────── locals ─────────────────────────────────────╮ │
│ │ data = { │ │
│ │ │ 'data': b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far', │ │
│ │ │ 'headers': { │ │
│ │ │ │ 'correlation_id': '2ac5886a-736d-4712-b878-448b8b041f43' │ │
│ │ │ } │ │
│ │ } │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/_compat.py:90 in model_to_jsonable │
│ │
│ 87 │ │ model: BaseModel, │
│ 88 │ │ **kwargs: Any, │
│ 89 │ ) -> Any: │
│ ❱ 90 │ │ return to_jsonable_python(model, **kwargs) │
│ 91 │ │
│ 92 │ def dump_json(data: Any) -> bytes: │
│ 93 │ │ return json_dumps(model_to_jsonable(data)) │
│ │
│ ╭───────────────────────────────────── locals ──────────────────────────────────────╮ │
│ │ kwargs = {} │ │
│ │ model = { │ │
│ │ │ 'data': b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far', │ │
│ │ │ 'headers': { │ │
│ │ │ │ 'correlation_id': '2ac5886a-736d-4712-b878-448b8b041f43' │ │
│ │ │ } │ │
│ │ } │ │
│ ╰───────────────────────────────────────────────────────────────────────────────────╯ │
╰───────────────────────────────────────────────────────────────────────────────────────╯
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x82 in position 0: invalid utf-8
Here's some test code:
@app.after_startup
async def startup():
faker = Faker("en_US")
urls = [faker.url() for _ in range(10)]
for url in tqdm.tqdm(urls):
obj = Request(
url=url,
type=faker.word(),
)
await broker.publish(
ormsgpack.packb(obj, option=ormsgpack.OPT_SERIALIZE_PYDANTIC),
list="job-queue",
)
The error is occurring on the producer side and the only workaround I've found so far is doing a runtime monkey-patch of the RawMessage.encode method so do the msgpack serialization at the final messaging encoding phase. This complicates things on the parsing side though as it breaks the normal message parsing (i.e. in order to get message headers, correlation_id, etc)
Any suggestions? Perhaps there needs to be additional Middleware hooks for handling the final message serialization and initial message deserialization so that serialization methods that utilize non utf-8 compatible binary are supported?
The only other option I can think of is base64 encoding the binary before message serialization which would kind of defeat the space-saving purpose of using a binary format.
Related: https://github.com/airtai/faststream/issues/1255
You can pass default redis-py option right to the broker constructor to control this behavior: https://github.com/airtai/faststream/blob/main/faststream/redis/broker/broker.py#L113-L117
Sorry, I got the problem just now. Here is a full MRE
from msgpack import packb
from faststream import FastStream, Logger
from faststream.redis import RedisBroker
broker = RedisBroker()
app = FastStream(broker)
@broker.subscriber("tests")
async def handler(data, logger: Logger):
logger.info(f"Received data: {data}")
@app.after_startup
async def start():
await broker.publish(
packb({"id": "12345678" * 4, "date": "2021-01-01T00:00:00Z"}),
"tests",
)
I think, we should reinvent our own message format to publish
Is the issue that you have to wrap whatever the client message is so that you get the necessary metadata you need on the receiving end?
Is the issue that you have to wrap whatever the client message is so that you get the necessary metadata you need on the receiving end?
Yeah, we have to use headers. Therefore we made our own message format, but I designed it bad and you faced with a serialization problem. I think, we should redisign RowMessage - encode and parse methods to be able serialize any data.
I might be missing something. It looks like swapping json with msgpack is just enough to resolve the problem.
I might be missing something. It looks like swapping json with msgpack is just enough to resolve the problem.
It's not an our dependency, so we couldn't use it there
Encountered this while using protobuf so +1 on this too.
Hi @Lancetnik. I've encountered the same problem while using msgpack with faststream and redis. Do you have any ideas about how to redesign RawMessage class? I might help with implementation. My suggestion is to use python's built-in struct.pack and struct.unpack functions to deal with binary data