confluent-schema-registry
confluent-schema-registry copied to clipboard
Error deserialising Avro message (from Java client) on consuming encoded messages produced from this library (NodeJS process)
- Produce the encoded message like the way below:
const { id } = await this._schemaRegistry.register(JSON.parse(event.schema));
encodedValue = await this._schemaRegistry.encode(id, JSON.parse(request.value));
await transaction.send({ topic: topic, messages: [ { key: request.key, value: encodedValue, headers: headers } ], });
- We have a java client that consumes this message (this java client uses confluent Kafka sdk) and when trying to consume the message produced by above code, getting bellow error.
2020-07-06T14:51:13.055321002Z 14:51:13.053 [-C-1] ERROR org.springframework.kafka.listener.LoggingErrorHandler - Error while processing: null 2020-07-06T14:51:13.055324002Z org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-5 at offset 0. If needed, please seek past the record to continue consumption. 2020-07-06T14:51:13.055327102Z Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 2020-07-06T14:51:13.055330102Z Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Per this below thread, looks the standard deserialiser in Java cannot deserialise the message encoded from NodeJS using this confluent-schema-registry.
https://stackoverflow.com/questions/52399417/unknown-magic-byte-with-kafka-avro-console-consumer?rq=1
NOTE: No issues in consuming from NodeJS client when using this same library.
We want messages to consumed by any client, we cannot restrict the consumers to NodeJS and this library. Please advice.
I have further investigated this and found issue with MAGIC_BYTE inside encode method. I have reviewed the Java deserialisation code from confluent schema registry and it uses hex 0x0. Where as this library is encoding with MAGIC_BYTE = = Buffer.alloc(1) with no encoding specified. If I override this with MAGIC_BYTE = Buffer.alloc(1, 0, "hex");, then java consumer is able to consume with no issues.
https://github.com/confluentinc/schema-registry/blob/027aacfb20c9daf801068a5273ae6f2ac9f455d7/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java#L204
Can we fix this and release new version?
For now, I am overriding by below code (consumer code):
let { MAGIC_BYTE } = require('@kafkajs/confluent-schema-registry/dist/encoder'); MAGIC_BYTE = Buffer.alloc(1, 0, "hex");
I'm very sceptical that this is indeed doing anything. The two buffers should be identical:
> Buffer.compare(Buffer.alloc(1), Buffer.alloc(1, 0, 'hex'))
0 // 0 means equal
> Buffer.alloc(1).toString()
'\u0000'
> Buffer.alloc(1, 0).toString()
'\u0000'
> Buffer.alloc(1, 0, 'hex').toString()
'\u0000'
The encoding
parameter to Buffer.alloc
does nothing unless the fill
argument is a string, and not setting fill
to anything is equivalent to 0
. See https://github.com/nodejs/node/blob/6ae1b9c457444fdfa5e30d9142bb8cdccf35f8ee/lib/buffer.js#L967
There must be something else going on. Could you perhaps write a repro case or encode with and without your patch and compare the buffers to see what's different?
Probably within node environment, both are the same, but on Java client we get exception if we do not explicitly mention hex as encoding in nodejs. Also I am wondering why would NodeJS keep an extra parameter of encoding if doesn't make any difference. Also we are using tool https://operatr.io for Kafka, and here as well it wasn't able to decode the Avro message when we send message from NodeJS that doesnt specify hex explicitly.
Probably within node environment, both are the same
The wire format is binary, so it doesn't matter which language generated it. Binary is binary, after all. There might very well a bug somewhere, but the proposed fix provably does nothing, so we would need to find out what the bug actually is.
Also I am wondering why would NodeJS keep an extra parameter of encoding if doesn't make any difference
Javascript does not have method overloading, so there's no way to have alloc(size: Number, fill: String, encoding: String)
and alloc(size: Number, fill: Number)
. So as you can see from the code, it does a bunch of type checks to behave differently depending on what arguments it receives.
Okay, I will keep investigating this issue. We just do not want to restrict messages produced by this library can only ready by consumer using this same library. In the realm of choreography based event driven architecture, there could any type of clients who wish to react to events(Kafka message) produced.
@sureshballa we have systems in production encoding events with this library and other Java systems consuming the events. I can confirm that it works as you expect, maybe you could share the schema you have, perhaps the support for something specific is not working.
@tulios sure. Here is the sample one that we are using for testing purpose
{ "name": "IntegrationEventData", "type": "record", "namespace": "com.company.model", "fields": [ { "name": "dateTimeOccurred", "type": "string", "default": "null" }, { "name": "data", "type": { "name": "data", "type": "record", "fields": [ { "name": "roleId", "type": "string", "default": "null" }, { "name": "roleName", "type": "string", "default": "null" } ] } }, { "name": "eventSource", "type": "string", "default": "null" }, { "name": "eventType", "type": "string", "default": "null" }, { "name": "eventId", "type": "string", "default": "null" } ] }
@sureshballa seems like this issue is still open. Have you figured out what the root cause is for the error that you were seeing? We are seeing a similar issue on our end. We have a node producer and a non-node consumer, and the consumer isn't able to deserialize the messages.