faust
faust copied to clipboard
Json decore error confluent platform
Checklist
- [X] I use Confluent Kafka image
- [X] I use a JSON schema for my topic
{
"$schema": "http://json-schema.org/draft-04/schema#",
"additionalProperties": false,
"properties": {
"frame": {
"type": "string"
},
"time": {
"format": "int64",
"type": "integer"
}
},
"required": [
"time",
"frame"
],
"title": "RawData",
"type": "object"
}
Steps to reproduce
I have the following code :
import faust
class RawData(faust.Record):
frame: str
time: int
app = faust.App(
id="test3",
broker='kafka://localhost:9092',
web_port=8080,
processing_guarantee="exactly_once",
)
rawdata_topic = app.topic("RawDataSigfox")
@app.agent(rawdata_topic)
async def consumer(events):
async for rd in events:
print("rawdata", rd.time, rd.frame)
if __name__ == '__main__':
worker = faust.Worker(app, loglevel="INFO")
worker.execute_from_commandline()
I generate messages respecting the above schema from another service using confluent lib and schema registry and I wanted to read them from my python code.
Expected behavior
I expect to see something like rawdata 1576320429 xxxxxxxx
in the console.
Actual behavior
It show me: faust.exceptions.ValueDecodeError: Expecting value: line 1 column 1 (char 0)
If i don't use the value_serializer="raw"
, it show me:
Finding it weird, I add the property value_serializer="raw"
to the topic.
After that I get this:
b'\x00\x00\x00\x00\x16{"time":1576320429,"frame":"0000bb7465f504abff000000000000009902871600000000000000000b"}'
I tried to consume the topic directly with the confluent lib without going through faust, it manages to handle the message without worries.
Full traceback
[2020-08-28 19:35:53,980] [19848] [ERROR] [^----Agent*: main.consumer]: Crashed reason=ValueDecodeError('Expecting value: line 1 column 1 (char 0)',) Traceback (most recent call last): File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 99, in loads_value payload = self._loads(serializer, value) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 64, in _loads return loads(serializer, data) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 368, in loads return get_codec(codec).loads(s) if codec else s File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 233, in loads s = cast(Codec, node)._loads(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 258, in _loads return json.loads(want_str(s)) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\utils\json.py", line 195, in loads return json_loads(s, **kwargs) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json_init.py", line 354, in loads return _default_decoder.decode(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 339, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 357, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
The above exception was the direct cause of the following exception:
Traceback (most recent call last): File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\agents\agent.py", line 647, in _execute_actor await coro File "C:/Users/Bluebeel/Documents/Codes/Shayp/Shayp.Ingestion.Workers/app/app.py", line 18, in consumer async for rd in events: File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\streams.py", line 860, in _py_aiter raise chan_errors.popleft() File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\transport\conductor.py", line 95, in on_message event = await chan.decode(message, propagate=True) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\schemas.py", line 146, in decode v: V = schema_loads_value(app, message, loads=loads_value) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\schemas.py", line 85, in loads_value serializer=serializer or self.value_serializer, File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 105, in loads_value sys.exc_info()[2]) from exc File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 99, in loads_value payload = self._loads(serializer, value) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 64, in _loads return loads(serializer, data) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 368, in loads return get_codec(codec).loads(s) if codec else s File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 233, in loads s = cast(Codec, node)._loads(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 258, in _loads return json.loads(want_str(s)) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\utils\json.py", line 195, in loads return json_loads(s, **kwargs) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json_init.py", line 354, in loads return _default_decoder.decode(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 339, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 357, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None faust.exceptions.ValueDecodeError: Expecting value: line 1 column 1 (char 0)
Process finished with exit code -1
# Versions
* Python 3.6.8
* Faust 1.10.4
* Windows 10
* Kafka 2.5
@bluebeel did you have any luck with this? I am facing the same issue and I'm surprised there is not enough support on the Faust documentation or even in stackoverflow.
Hello @cfloressuazo, I did not find a solution and in the meantime we decided not to use faust ^^'
Problem is still actual