apicurio-registry icon indicating copy to clipboard operation
apicurio-registry copied to clipboard

Some message don't have MAGIC_BYTE, when use with debezium

Open j2gg0s opened this issue 1 year ago • 4 comments

Only second message's value start with \x00.

k exec -ti -n infra-kafka x01-0 -c kafka -- /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --max-messages 3 --value-deserializer "org.apache.kafka.common.serialization.BytesDeserializer" --topic kcx.cactus.lifestyle.wish_spu --property print.partition=true --property print.offset=true --property print.headers=true --partition=0
Partition:0     Offset:5144346  apicurio.key.globalId:0B,apicurio.key.encoding:BINARY,apicurio.value.globalId:0D,apicurio.value.encoding:BINARY \x02\xB8\xB0\x8B\x80\xE0\xB7\xAA\xA1\x0F\x8C\xFA\x99\x80\xC0\xBB\xBF\xAC\x0B\x02\x06&\x9A\xD0\x90\xC6\xD6\xCE\xBAa\xD0\xC3\xE3\xD7\xDDa\x00\x5C\x00\xF8\x97\xB0\x01\x5C\x02\xB8\xB0\x8B\x80\xE0\xB7\xAA\xA1\x0F\x8C\xFA\x99\x80\xC0\xBB\xBF\xAC\x0B\x02\x06!\x91\xC0\x90\xC6\xD6\xCE\xBAa\xC0\xCF\xF1\xA0\x9Fc\x00\x5C\x00\xF8\x97\xB0\x01\x5C\x162.3.0.Final\x0Amysql\x14kcx.cactus\x90\x8F\xB6\x85\x9Fc\x00\x0Afalse\x12lifestyle\x00\x02\x10wish_spu\xDE\x9C\xB2\xA6\x0D\x02^953b272e-e0d3-11eb-83d4-98039b072ec8:2291839102 mysql-bin.005635\xF0\xCC\xC4\xA4\x02\x00\x02\xA8\xEF(\x00\x02u\x02\xAC\x90\xB6\x85\x9Fc\x00
Partition:0     Offset:5144347  apicurio.key.globalId:0B,apicurio.key.encoding:BINARY,apicurio.value.globalId:0D,apicurio.value.encoding:BINARY \x00\x02\xC4\xBD\x88\x80\x80\xB9\xAE\xF0\x12\xC0\xA1\x9B\x80\x80\x92\xC6\xE4\x0F\x02\x02\x00\x90\xDF\xF1\xA0\x9Fc\x90\xDF\xF1\xA0\x9Fc\x00\x9A\xB6z\xC8\x92\x86\x80\xC0\x85\xA8\xCC\x0A\xAE\xE4\x8C\x01\xC8#\x162.3.0.Final\x0Amysql\x14kcx.cactus\x90\x8F\xB6\x85\x9Fc\x00\x0Afalse\x12lifestyle\x00\x02\x10wish_spu\xDE\x9C\xB2\xA6\x0D\x02^953b272e-e0d3-11eb-83d4-98039b072ec8:2291839140 mysql-bin.005635\xA0\xE9\xC8\xA4\x02\x00\x02\xA8\xEF(\x00\x02c\x02\x96\x98\xB6\x85\x9Fc\x00
Partition:0     Offset:5144348  apicurio.key.globalId:0B,apicurio.key.encoding:BINARY,apicurio.value.globalId:0D,apicurio.value.encoding:BINARY \x02\xE2\xF9\x92\x80\xC0\x90\x8A\xA2\x0F\x8C\xFA\x99\x80\xC0\xBB\xBF\xAC\x0B\x02\x06#f\x80\x90\xD9\xC5\xB4\xDBa\x80\xAE\xE5\xD7\xDDa\x00\x5C\x00\xF8\x97\xB0\x01\x5C\x02\xE2\xF9\x92\x80\xC0\x90\x8A\xA2\x0F\x8C\xFA\x99\x80\xC0\xBB\xBF\xAC\x0B\x02\x06!\x91\xC0\x90\xD9\xC5\xB4\xDBa\x90\xDF\xF1\xA0\x9Fc\x00\x5C\x00\xF8\x97\xB0\x01\x5C\x162.3.0.Final\x0Amysql\x14kcx.cactus\x90\x8F\xB6\x85\x9Fc\x00\x0Afalse\x12lifestyle\x00\x02\x10wish_spu\xDE\x9C\xB2\xA6\x0D\x02^953b272e-e0d3-11eb-83d4-98039b072ec8:2291839146 mysql-bin.005635\xEC\xA5\xC9\xA4\x02\x00\x02\xA8\xEF(\x00\x02u\x02\xBC\x98\xB6\x85\x9Fc\x00
Processed a total of 3 messages

My sink(kafka-connect-jdbc) can consume message correct with apicurio's converter.

This is a bug?

j2gg0s avatar Jan 11 '24 11:01 j2gg0s

Can you include any configuration you have for your source/sink applications? It looks to me like the messages have headers, and the globalId of the schema is included in the headers. When that is the case, there should not be a magic byte in the payload. The magic byte is only present when the globalId is encoded in the payload, not when it's included in the headers.

EricWittmann avatar Jan 23 '24 14:01 EricWittmann

Can you include any configuration you have for your source/sink applications? It looks to me like the messages have headers, and the globalId of the schema is included in the headers. When that is the case, there should not be a magic byte in the payload. The magic byte is only present when the globalId is encoded in the payload, not when it's included in the headers.

@EricWittmann Do we have document for this?

j2gg0s avatar Jan 24 '24 09:01 j2gg0s

{
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.connectionTimeZone": "Asia/Shanghai",
    "database.hostname": "***",
    "database.password": "***",
    "database.port": "3306",
    "database.server.id": "5501",
    "database.user": "***",
    "errors.log.enable": "true",
    "errors.retry.timeout": "600000",
    "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "key.converter.apicurio.registry.auto-register": "true",
    "key.converter.apicurio.registry.find-latest": "true",
    "key.converter.apicurio.registry.url": "http://apicurio.kafka:8080/apis/registry/v2",
    "name": "source-cactus",
    "producer.override.max.request.size": "10485760",
    "read.only": "true",
    "schema.history.internal.kafka.bootstrap.servers": "x01.infra-kafka:9092",
    "schema.history.internal.kafka.topic": "kch.cactus",
    "schema.name.adjustment.mode": "avro",
    "snapshot.mode": "initial",
    "table.exclude.list": ".+\\.tp_[0-9]+_.+,.+\\.tpa_[a-zA-Z0-9]+_.+,likes\\..+",
    "time.precision.mode": "connect",
    "tombstones.on.delete": "false",
    "topic.creation.default.cleanup.policy": "compact",
    "topic.creation.default.compression.type": "lz4",
    "topic.creation.default.partitions": "9",
    "topic.creation.default.replication.factor": "1",
    "topic.prefix": "kcx.cactus",
    "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "value.converter.apicurio.registry.auto-register": "true",
    "value.converter.apicurio.registry.find-latest": "true",
    "value.converter.apicurio.registry.url": "http://apicurio.kafka:8080/apis/registry/v2"
  },
  "name": "source-cactus",
  "tasks": [
    {
      "connector": "source-cactus",
      "task": 0
    }
  ],
  "type": "source"
}

j2gg0s avatar Jan 24 '24 09:01 j2gg0s

Eric is right, with your configuration, the information you're looking for is not encoded. The latest available artifact for that topic name is used, you have more information in the docs.

carlesarnal avatar Mar 12 '24 12:03 carlesarnal

Closing as stale. If any other information is requested, please, re-open!

carlesarnal avatar May 15 '24 11:05 carlesarnal