MLServer
MLServer copied to clipboard
[BUG]: Kafka Message cannot be serialized to JSON when dict contains bytes
Issue
When using the KafkaServer to process Inference Requests while using a model that produces results that contain raw bytes (e.g. any recent transformer), the response is unable to be written to the output topic. The resulting error:
2024-03-05 00:50:10,677 [mlserver.kafka] ERROR - ERROR 500 - Type is not JSON serializable: bytes
Traceback (most recent call last):
File "venv/lib/python3.10/site-packages/mlserver/kafka/server.py", line 72, in _process_request_cb
process_request_task.result()
File "venv/lib/python3.10/site-packages/mlserver/kafka/server.py", line 98, in _process_request
value=kafka_response.encoded_value,
File "venv/lib/python3.10/site-packages/mlserver/kafka/message.py", line 78, in encoded_value
return _encode_value(self.value)
File "venv/lib/python3.10/site-packages/mlserver/kafka/message.py", line 21, in _encode_value
return orjson.dumps(v)
TypeError: Type is not JSON serializable: bytes
Context / How to reproduce
python --version: Python 3.10.13
mlserver --version: mlserver, version 1.4.0
requires mlserver-huggingface
model-settings.json:
{
"name": "transformer",
"implementation": "mlserver_huggingface.HuggingFaceRuntime",
"parameters": {
"extra": {
"task": "text-generation",
"pretrained_model": "distilgpt2"
}
}
}
settings.json:
{
"debug": true,
"kafka_enabled": true,
"kafka_servers": "<redacted>"
}
example.py:
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers="<redacted>:9094")
headers = {
"mlserver-model": b"transformer",
}
inference_request = {
"inputs": [
{
"name": "args",
"shape": [1],
"datatype": "BYTES",
"data": ["ml server cant do bytes yet"],
}
]
}
producer.send(
"mlserver-input",
json.dumps(inference_request).encode("utf-8"),
headers=list(headers.items()))
producer.flush()
Possible Solution
The http server has a special encoder that is used to make this possible, simplest idea would be to just reuse that.
I created a initial Draft on how this could be done in #1622, but this is not DRY. Once this has been vetted I would refactor the code and write the appropriate tests.
I'm facing the same issue when my model outputs a numpy.array of string. It works fine when invoked through the REST API, but failed when invoked via Kafka server. I believe this is because of the difference on how InferenceResponse object is serialized. FastAPI uses the fastapi.encoders.jsonable_encoder function, while KafkaMessage uses json.dumps(InferenceResponse.dict()) [code]
I tried adding jsonable_encoder prior to json.dumps in KafkaMessage encoder function and it seems to have solved the issue.
from fastapi.encoders import jsonable_encoder
def _encode_value(v: dict) -> bytes:
v = jsonable_encoder(v)
if orjson is None:
dumped = json.dumps(v)
return dumped.encode("utf-8")
return orjson.dumps(v)
Using pydantic's encoder as the default encoder is also a possible solution (for pydantic<2).
from pydantic.json import pydantic_encoder
def _encode_value(v: dict) -> bytes:
if orjson is None:
dumped = json.dumps(v, default=pydantic_encoder)
return dumped.encode("utf-8")
return orjson.dumps(v, default=pydantic_encoder)
@hansen-young we merged a contribution from @DerTiedemann https://github.com/SeldonIO/MLServer/pull/1622 that fixes this issue. Please submit a new ticket if there are things still that need to be fixed.