faust icon indicating copy to clipboard operation
faust copied to clipboard

Json decore error confluent platform

Open bluebeel opened this issue 4 years ago • 3 comments

Checklist

  • [X] I use Confluent Kafka image
  • [X] I use a JSON schema for my topic
{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "additionalProperties": false,
  "properties": {
    "frame": {
      "type": "string"
    },
    "time": {
      "format": "int64",
      "type": "integer"
    }
  },
  "required": [
    "time",
    "frame"
  ],
  "title": "RawData",
  "type": "object"
}

Steps to reproduce

I have the following code :

import faust

class RawData(faust.Record):
    frame: str
    time: int

app = faust.App(
    id="test3",
    broker='kafka://localhost:9092',
    web_port=8080,
    processing_guarantee="exactly_once",
)

rawdata_topic = app.topic("RawDataSigfox")

@app.agent(rawdata_topic)
async def consumer(events):
    async for rd in events:
        print("rawdata", rd.time, rd.frame)


if __name__ == '__main__':
    worker = faust.Worker(app, loglevel="INFO")
    worker.execute_from_commandline()

I generate messages respecting the above schema from another service using confluent lib and schema registry and I wanted to read them from my python code.

Expected behavior

I expect to see something like rawdata 1576320429 xxxxxxxx in the console.

Actual behavior

It show me: faust.exceptions.ValueDecodeError: Expecting value: line 1 column 1 (char 0)

If i don't use the value_serializer="raw", it show me:

Finding it weird, I add the property value_serializer="raw" to the topic. After that I get this: b'\x00\x00\x00\x00\x16{"time":1576320429,"frame":"0000bb7465f504abff000000000000009902871600000000000000000b"}'

I tried to consume the topic directly with the confluent lib without going through faust, it manages to handle the message without worries.

Full traceback

[2020-08-28 19:35:53,980] [19848] [ERROR] [^----Agent*: main.consumer]: Crashed reason=ValueDecodeError('Expecting value: line 1 column 1 (char 0)',) Traceback (most recent call last): File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 99, in loads_value payload = self._loads(serializer, value) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 64, in _loads return loads(serializer, data) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 368, in loads return get_codec(codec).loads(s) if codec else s File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 233, in loads s = cast(Codec, node)._loads(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 258, in _loads return json.loads(want_str(s)) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\utils\json.py", line 195, in loads return json_loads(s, **kwargs) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json_init.py", line 354, in loads return _default_decoder.decode(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 339, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 357, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

The above exception was the direct cause of the following exception:

Traceback (most recent call last): File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\agents\agent.py", line 647, in _execute_actor await coro File "C:/Users/Bluebeel/Documents/Codes/Shayp/Shayp.Ingestion.Workers/app/app.py", line 18, in consumer async for rd in events: File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\streams.py", line 860, in _py_aiter raise chan_errors.popleft() File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\transport\conductor.py", line 95, in on_message event = await chan.decode(message, propagate=True) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\schemas.py", line 146, in decode v: V = schema_loads_value(app, message, loads=loads_value) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\schemas.py", line 85, in loads_value serializer=serializer or self.value_serializer, File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 105, in loads_value sys.exc_info()[2]) from exc File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 99, in loads_value payload = self._loads(serializer, value) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\registry.py", line 64, in _loads return loads(serializer, data) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 368, in loads return get_codec(codec).loads(s) if codec else s File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 233, in loads s = cast(Codec, node)._loads(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\serializers\codecs.py", line 258, in _loads return json.loads(want_str(s)) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\site-packages\faust\utils\json.py", line 195, in loads return json_loads(s, **kwargs) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json_init.py", line 354, in loads return _default_decoder.decode(s) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 339, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "C:\Users\Bluebeel\AppData\Local\Programs\Python\Python36\lib\json\decoder.py", line 357, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None faust.exceptions.ValueDecodeError: Expecting value: line 1 column 1 (char 0)

Process finished with exit code -1


# Versions

* Python 3.6.8
* Faust 1.10.4
* Windows 10
* Kafka 2.5

bluebeel avatar Aug 28 '20 17:08 bluebeel

@bluebeel did you have any luck with this? I am facing the same issue and I'm surprised there is not enough support on the Faust documentation or even in stackoverflow.

cfloressuazo avatar Dec 19 '20 14:12 cfloressuazo

Hello @cfloressuazo, I did not find a solution and in the meantime we decided not to use faust ^^'

bluebeel avatar Dec 22 '20 10:12 bluebeel

Problem is still actual

AminovE99 avatar Aug 16 '22 08:08 AminovE99