kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

AvroSource -> JsonFormat does not output Decimals correctly

Open simplesteph opened this issue 8 years ago • 22 comments

If a field is a logical type of Decimal, its primitive type is bytes, and it seems this connector does not attempt to create a BigDecimal from them, but instead converts them to a String that is not usable.

Example:

{ "currency_value":"BfXhAA==" }

Steps to reproduce

  1. take debezium as a source and create a table with a decimal field. Make sure data is stored as Avro within Kafka Connect
  2. Use the S3 sink and choose output format being "format.class": "io.confluent.connect.s3.format.json.JsonFormat"

simplesteph avatar May 15 '17 03:05 simplesteph

In fact, similar behaviour happens with AvroFormat. Decimals are just not represented correctly at all. The only way to bypass this for me was casting stuff to string before giving data to kafka, and then cast to decimal manually on the other end.

dimon222 avatar Jul 17 '17 20:07 dimon222

Yeah, the challenge with these pipelines is that it can sometimes be difficult to find the real source of the problem.

It's possible this will be fixed in the next version of the AvroConverter since we have a newer version of Avro now: https://github.com/confluentinc/schema-registry/issues/434 However, I kind of doubt that is the issue since Avro 1.7.7 already supported the decimal type. But I also don't see an obvious place where the loss of type information is creeping in since JsonFormat just uses JsonConverter to convert the data and JsonConverter should be handling the decimal logical type.

We'll probably need a test case or more info about variants of converters that do/do not work (e.g. does JSON with schemas enabled on the first debezium step get the info through ok?) to make more progress on this.

ewencp avatar Jul 18 '17 00:07 ewencp

Hello, We have similar problem with decimals (this time coming from oracle via jdbc connector). It seems to me that the problem is logical converter in JsonConverter. It works like this: value.unscaledValue().toByteArray() (https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java#L69) which gives "a byte array containing the two's-complement representation of BigInteger". Which serialized to json gives this unreadable string...

mproch avatar Aug 22 '17 10:08 mproch

@mproch problem also happens in AvroFormat, so its not just JsonFormat bug. If following your logic, then we probably see encoded bytes array in result sink.

So, options.

  1. Change data type from bytearray to something else that can store bytes, and it serializes as human-readable decimal
  2. JsonConverter/AvroConverter to be rewritten to find out that those bytes were true decimal, so decode it before writing to sink. For AvroConverter it might be a bit easier because it knows the schema as a part of structure

dimon222 avatar Aug 22 '17 13:08 dimon222

IMO Anything that does Avro -> Json should be aware of logical types, and convert accordingly. That's the whole points of logical types. Unsure if it's at Confluent's converters level, or Avro's converter level, but it has to be done somewhere Cc @ewencp @kkonstantine @jcustenborder I think you guys somehow worked on this (Jeremy you wrote the converters, right?)

simplesteph avatar Aug 22 '17 23:08 simplesteph

The text ending in == looks likely to be base64-encoded. Maybe decoding to a byte-array and converting that to BigDecimal will help someone?

rjharmon avatar Sep 21 '17 21:09 rjharmon

Any clue on solution for this? Casting sounds like a hack, would be good to have plain types.

dimon222 avatar Dec 14 '17 17:12 dimon222

Hello, We are also having a similar problem with the datatypes coming from the Oracle JDBC Source connector. I just wanted to know if this is being worked on or if there is a fix for this issue.

sairam881990 avatar Mar 08 '18 16:03 sairam881990

Hello, same problem here.

We are using type bytes, logical type decimal, as documented in the avro spec. https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types

{
  "type": "bytes",
  "logicalType": "decimal",
  "precision": 4,
  "scale": 2
}

For example kafka-avro-console-consumer display bytes as string, for example:

  • 0 will be displayed as string "\u0000"
  • 1.0 will be display as string "d"

NicolaeNMV avatar Mar 13 '18 17:03 NicolaeNMV

If you're using Java 8, you can convert the Base64 string to a byte array and then get a bigDecimal with some black magic:

BigDecimal bigDecimal = new BigDecimal(new BigInteger(Base64.getDecoder().decode("BfXhAA==")), scale);

This will convert "BfXhAA==" to its bigDecimal equivalent according to whatever scale you specify. A similar implementation is in the kafka connect Decimal.java class:

https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java#L72

westonjackson avatar Jun 06 '18 20:06 westonjackson

@westonjackson that's one good way to do this, it's working for me

bobbui avatar Jul 16 '18 22:07 bobbui

@westonjackson The return value of your method is Integer, The result I want is Double, so During the conversion process, the decimal point is lost. How should I handle it?

git-lyn avatar Dec 14 '18 01:12 git-lyn

Same issue here. Im reading data from Oracle to Kafka with JSON/jdbc. For integer values or DECIMAL(N,0) I am kind of hacking this way in PySpark: df = df.withColumn('QTY0', unbase64(df.QTY)) df = df.withColumn('QTY1', hex(df.QTY0)) df = df.withColumn('QTY2', conv(df.QTY1, 16, 10).cast('integer')) However, for decimal I always loose the decimal point.

jonathan-gabriel avatar Dec 06 '19 23:12 jonathan-gabriel

We have a similar issue while using s3 sink and our data includes currency I tried debezium suggestion and Im able to retrieve BigDecimal but Schema.Type still remains BYTE and unable to find a suitable data type for BigDecimal apart from BYTES.

https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation

byte[] encoded = ...;
int scale = ...;
final BigDecimal decoded = new BigDecimal(new BigInteger(encoded), scale);

@westonjackson How did you apply Bigdecimal to schema, did you wrote a transform ? @simplesteph Did you get a solution for this ?

baghelavinash avatar Apr 08 '20 21:04 baghelavinash

Similar problem here. We use the Confluent REST-Proxy to write and read Avro messages. Trying to make this work for messages that contain decimal fields, we don't have a way to convert either way.

This means not knowing what to 'put' in the input-Json, and unknown how to interpret the eventual output-Json value.

Any progress on this issue or further suggestions?

LeonardoBonacci avatar Apr 29 '20 23:04 LeonardoBonacci

I posted an answer in stackoverflow which might help you how to convert BigDecimal to json, so avro will understand it.

xjahic avatar May 11 '20 08:05 xjahic

It's totally unreasonable that a user is expected to know how Java decodes the bytes+decimal when decoding a JSON payload. The standard way of representing a decimal in JSON is to store it in a string like so: "1.23456789". You'd construct it with new BigDecimal("1.23456789") in Java. I can't believe that this is such a difficult issue to solve.

ziggythehamster avatar Jun 10 '20 23:06 ziggythehamster

I should add that this is very easily reproducible. Encode a Confluent Schema Registry header'ed Avro payload. Here's a really simple schema:

{
  "type" : "record",
  "name" : "person",
  "namespace" : "some-sample",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "age",
    "type" : "int"
  }, {
    "name" : "salary",
    "type" : {
      "type" : "bytes",
      "logicalType" : "decimal",
      "precision" : 9,
      "scale" : 6
    }
  } ]
}

Correctly encode a payload with this schema and the schema header and drop it into a Kafka topic. Configure Kafka Connect to sink this data to S3 or something as JSON. Boom, it does this Base64 thing.

ziggythehamster avatar Jun 11 '20 00:06 ziggythehamster

Here's a link to the Kafka Connect PR that brought in support for this, but I have no idea why it doesn't then work here: https://github.com/apache/kafka/pull/7354

ziggythehamster avatar Jun 15 '20 17:06 ziggythehamster

Is there any news on this @ziggythehamster @kkonstantine ? we are using v.10.0.1 and decimals ("type" : "bytes", "logicalType" : "decimal") coming in to s3 with base64 format

KubilayKarayilan avatar Sep 08 '21 11:09 KubilayKarayilan

NicolaeNMV

Hi @NicolaeNMV - Is this resolved for you? If yes, could you please share the fix. I'm facing the same issue now.

https://github.com/confluentinc/kafka-connect-storage-cloud/issues/48#issuecomment-372758363

SPavanManda avatar Oct 19 '22 10:10 SPavanManda

Try setting decimal.handling.mode to double in debezium connector. All numeric values will be sent as double and this issue can be prevented.

sridevs avatar Aug 07 '23 01:08 sridevs