winton-kafka-streams
winton-kafka-streams copied to clipboard
Error decoding bytes type with logicalType decimal - Avro
I have the following Avro schema for a field that is a decimal
logicalType.
{
"name": "amount",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 10,
"connect.version": 1,
"connect.parameters": {
"scale": "2",
"connect.decimal.precision": "10"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
The value from processor
is a dict
with string
and bytes
values
The field value in bytes: b'\x00\x97\xf4'
I am unable to get the decimal value to proceed further. I've tried the following which did not work.
-
data.decode()
&data.decode('latin-1')
Fails with error
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x97 in position 1: invalid start byte
- BinaryDecoder
decoder = BinaryDecoder(io.BytesIO(data))
decoder.read_double()
Fails with error
File "/IdeaProjects/virtualenv/event-venv/lib/python3.6/site-packages/avro/io.py", line 240, in read_double
((ord(self.read(1)) & 0xff) << 48) |
File "/IdeaProjects/virtualenv/event-venv/lib/python3.6/site-packages/avro/io.py", line 180, in read
assert (len(input_bytes) == n), input_bytes
AssertionError: b''
How can I extract data from byte fields in Avro?
You can use this to decode the byte string into decimal.
def decode_decimal(value: bytes, num_places: int) -> Decimal:
value_size = len(value)
for fmt in ('>b', '>h', '>l', '>q'):
fmt_size = struct.calcsize(fmt)
if fmt_size >= value_size:
padding = b'\x00' * (fmt_size - value_size)
int_value = struct.unpack(fmt, padding + value)[0]
scale = Decimal('1') / (10 ** num_places)
return Decimal(int_value) * scale
raise ValueError('Could not unpack value')
Ex:
>>> decode_decimal(b'\x00\x97\xf4', 2)
Decimal('389.00')