confluent-schema-registry icon indicating copy to clipboard operation
confluent-schema-registry copied to clipboard

Kafkajs - confluent schema registry error while consuming message from java avro producer (Magic byte)

Open ereshzealous opened this issue 2 years ago • 8 comments

Hi I have a spring cloud producer which uses a confluent schema registry that generates and publishes to Kafka topic every 15 seconds. I can successfully consume messages from java consumers without any issue. I wrote a node js consumer using Kafka js, it shows error while consuming the message from topic

I have source code here. Please have a look and provide points to figure out the issue.

{
   "level":"ERROR",
   "timestamp":"2021-08-30T09:36:01.664Z",
   "logger":"kafkajs",
   "message":"[Consumer] Crash: KafkaJSNumberOfRetriesExceeded: Message encoded with magic byte {\"type\":\"Buffer\",\"data\":[72]}, expected {\"type\":\"Buffer\",\"data\":[0]}",
   "groupId":"test-group",
   "retryCount":5,
   "stack":"KafkaJSNonRetriableError\n  Caused by: ConfluentSchemaRegistryArgumentError: Message encoded with magic byte {\"type\":\"Buffer\",\"data\":[72]}, expected {\"type\":\"Buffer\",\"data\":[0]}\n    at SchemaRegistry.decode (/Users/ereshgorantla/Documents/Dev/My Work/node/kafka-consumer/node_modules/@kafkajs/confluent-schema-registry/dist/SchemaRegistry.js:133:19)\n    at Runner.eachMessage (/Users/ereshgorantla/Documents/Dev/My Work/node/kafka-consumer/src/consumer.js:25:43)\n    at Runner.processEachMessage (/Users/ereshgorantla/Documents/Dev/My Work/node/kafka-consumer/node_modules/kafkajs/src/consumer/runner.js:151:20)\n    at onBatch (/Users/ereshgorantla/Documents/Dev/My Work/node/kafka-consumer/node_modules/kafkajs/src/consumer/runner.js:287:20)\n    at /Users/ereshgorantla/Documents/Dev/My Work/node/kafka-consumer/node_modules/kafkajs/src/consumer/runner.js:339:21\n    at invoke (/Users/ereshgorantla/Documents/Dev/My Work/node/kafka-consumer/node_modules/kafkajs/src/utils/concurrency.js:38:5)\n    at push (/Users/ereshgorantla/Documents/Dev/My Work/node/kafka-consumer/node_modules/kafkajs/src/utils/concurrency.js:51:7)\n    at /Users/ereshgorantla/Documents/Dev/My Work/node/kafka-consumer/node_modules/kafkajs/src/utils/concurrency.js:60:53\n    at new Promise (<anonymous>)\n    at /Users/ereshgorantla/Documents/Dev/My Work/node/kafka-consumer/node_modules/kafkajs/src/utils/concurrency.js:60:20"
}

I was wondered, kafka js compares magic byte

async decode(buffer) {
        if (!Buffer.isBuffer(buffer)) {
            throw new errors_1.ConfluentSchemaRegistryArgumentError('Invalid buffer');
        }
        const { magicByte, registryId, payload } = wireDecoder_1.default(buffer);
        if (Buffer.compare(wireEncoder_1.MAGIC_BYTE, magicByte) !== 0) {
            throw new errors_1.ConfluentSchemaRegistryArgumentError(`Message encoded with magic byte ${JSON.stringify(magicByte)}, expected ${JSON.stringify(wireEncoder_1.MAGIC_BYTE)}`);
        }
        const schema = await this.getSchema(registryId);
        return schema.fromBuffer(payload);
    }

ereshzealous avatar Aug 30 '21 10:08 ereshzealous

Hi, Any inputs are highly appreciated.

ereshzealous avatar Sep 01 '21 07:09 ereshzealous

When encoding a message for kafka and using schema registry, the first byte of the message should be a 0, and the next 4 bytes of the message should be schema id from the registry, in big-endian order (network order). Then you should see the serialized avro data. The first byte of your message appears to be a 72 instead of 0. Are you actually using the java client with schema registry to encode the correct wire format or are you just putting a serialized avro buffer on the wire? If the latter, then that won't work correctly with clients that assume messages are encoded via the schema-registry wire protocol. See the "Wire Protocol" section of the docs here: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

I would assume that the java client integrated with schema registry would do the wire protocol correctly, but without information about how you are encoding the messages, it is hard to say.

ideasculptor avatar Sep 23 '21 21:09 ideasculptor

I am running into same problem. I am having problem to decode Avro message sent from Java service. Java service is simple Spring Boot service. Here is the repo of it https://bitbucket.org/nicholasnet/boot-kafka-poc/src/master/

I am using this Gradle plugin https://github.com/davidmc24/gradle-avro-plugin as well. I can see message perfectly in Control Center as well.

nirmal-chg avatar Oct 01 '21 22:10 nirmal-chg

In my experience, if you can see the message properly broken out in control center (except on confluent cloud, where control center doesn't parse messages correctly, yet), then it is being encoded correctly. However, it is also hard to see how the parsing code pasted into this method could be defective.

wireDecoder.default is implemented like this:

export default (buffer: Buffer) => ({
  magicByte: buffer.slice(0, 1),
  registryId: buffer.slice(1, 5).readInt32BE(0),
  payload: buffer.slice(5, buffer.length),
})

So it really does seem to be getting a value of 72 in the first byte.

Any chance you have an incorrectly encoded message in the topic before the correctly encoded ones? Can you start the consumer so it starts with the most recent message instead of message 0?

@nirmal-chg, I took a quick look at your java client code and see no reason why it should be encoding incorrectly, so I think the same question would apply to you. Can you try with a freshly created topic or only consume new messages?

ideasculptor avatar Oct 02 '21 00:10 ideasculptor

@ideasculptor thank you very much for looking into this issue. I created the consumer in Java/Kotlin side to see if it can consumer its own created message or not..... and it ran fine. Furthermore, I swaped to the plugin to use Maven Avro plugin thinking that may be plugin is faulty but that was not the case since everything ran fine.

So, seems like there is some issue in JS side. Here is my JS code if it helps which is borrowed from example directly.

const { Kafka } = require('kafkajs')
const { SchemaRegistry, SchemaType } = require('@kafkajs/confluent-schema-registry')

const kafka = new Kafka({ clientId: 'group_id', brokers: ['localhost:9092', 'localhost:9101'] })
const registry = new SchemaRegistry({ host: 'http://localhost:8081/' })
const consumer = kafka.consumer({ groupId: 'group_id' })

const schema = `
{
    "type": "record",
    "name": "Telemetry",
    "namespace": "com.playpen.kafkaDemo.entity",
    "fields": [
        {
            "name": "id",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "clientId",
            "type": {
                "type": "int"
            }
        },
        {
            "name": "speed",
            "type": {
                "type": "int"
            }
        },
        {
            "name": "timestamp",
            "type": {
                "type": "long",
                "logicalType": "local-timestamp-micros"
            }
        }
    ]
}
`

const run = async () => {
  await consumer.connect()
  await consumer.subscribe({ topic: 'car-events', fromBeginning: true })

  const { id } = await registry.register({ type: SchemaType.AVRO, schema }) // { id: 2 }

  console.log(id)

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const decodedKey = await registry.decode(message.key)
      const decodedValue = await registry.decode(message.value)
      console.log({ decodedKey, decodedValue })
    },
  })
}

run().catch(console.error)

nirmal-chg avatar Oct 03 '21 03:10 nirmal-chg

Maybe it will save someone couple of minutes. Faced the similar traceback about Magic byte today.

In my case:

  • message.value - was Avro-encoded
  • message.key - was just plaintext

So, you might want to check your keys (or values) are actually Avro-encoded : message.key.toString()

zwerg44 avatar Nov 15 '21 18:11 zwerg44

same probleme to me. Due to this issue , i can not use kafkajs

clodio avatar Jan 27 '22 08:01 clodio

Same issue. Found out that topic did not had an associated key schema.

noushad-flix avatar Jan 04 '24 12:01 noushad-flix