winton-kafka-streams icon indicating copy to clipboard operation
winton-kafka-streams copied to clipboard

Error decoding bytes type with logicalType decimal - Avro

Open arhantpanda opened this issue 4 years ago • 1 comments

I have the following Avro schema for a field that is a decimal logicalType.

{
  "name": "amount",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 10,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "2",
        "connect.decimal.precision": "10"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}

The value from processor is a dict with string and bytes values The field value in bytes: b'\x00\x97\xf4'

I am unable to get the decimal value to proceed further. I've tried the following which did not work.

  1. data.decode() & data.decode('latin-1')
Fails with error
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x97 in position 1: invalid start byte
  1. BinaryDecoder
decoder = BinaryDecoder(io.BytesIO(data))
decoder.read_double()

Fails with error
  File "/IdeaProjects/virtualenv/event-venv/lib/python3.6/site-packages/avro/io.py", line 240, in read_double
    ((ord(self.read(1)) & 0xff) << 48) |
  File "/IdeaProjects/virtualenv/event-venv/lib/python3.6/site-packages/avro/io.py", line 180, in read
    assert (len(input_bytes) == n), input_bytes
AssertionError: b''

How can I extract data from byte fields in Avro?

arhantpanda avatar Nov 01 '19 05:11 arhantpanda

You can use this to decode the byte string into decimal.

def decode_decimal(value: bytes, num_places: int) -> Decimal:
    value_size = len(value)
    for fmt in ('>b', '>h', '>l', '>q'):
        fmt_size = struct.calcsize(fmt)
        if fmt_size >= value_size:
            padding = b'\x00' * (fmt_size - value_size)
            int_value = struct.unpack(fmt, padding + value)[0]
            scale = Decimal('1') / (10 ** num_places)
            return Decimal(int_value) * scale
    raise ValueError('Could not unpack value')

Ex:

>>> decode_decimal(b'\x00\x97\xf4', 2)
Decimal('389.00')

ryananguiano avatar Jun 14 '21 06:06 ryananguiano