schema-registry icon indicating copy to clipboard operation
schema-registry copied to clipboard

Avro deserialization success but cannot print the message

Open karthik-arris opened this issue 7 years ago • 3 comments

I have a legacy C++ based system which spits out binary encoded Avro data that supports confluent Avro schema registry format. In my Java application, I successfully deserialized the message using KafkaAvroDeserializer class but could not print out the message.

` private void consumeAvroData(){ String group = "group1"; Properties props = new Properties(); props.put("bootstrap.servers", "http://1.2.3.4:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", LongDeserializer.class.getName()); props.put("value.deserializer", KafkaAvroDeserializer.class.getName()); // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false"); props.put("schema.registry.url","http://1.2.3.4:8081"); KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);

consumer.subscribe(Arrays.asList(TOPIC_NAME));
System.out.println("Subscribed to topic " + TOPIC_NAME);

while (true) {
    ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
    for (ConsumerRecord<String, GenericRecord> record : records)
    {
        System.out.printf("value = %s\n",record.value());
    }
}

} ` The output I see is

{"value":"�"}

Any help appreciated !

karthik-arris avatar Aug 25 '17 17:08 karthik-arris

@karthik-arris would be happy to help you on this but it would be great if you could provide some examples of the records in the topic.

mageshn avatar Oct 06 '17 18:10 mageshn

Hi @mageshn Thanks for responding.

## Scenario -1 - Works

My workflow is - I create avsc files, generate C++ classes using avrogencpp tool and create avro binary encoded data in my C++ application.

test.avsc

{
    "namespace": "com.company.project",
    "name": "Component_DeviceInfo",
    "type": "record",
    "doc": "Identifies a client device",
    "fields": [
        {
            "name": "deviceId",
            "type": [
                "null",
                "string"
            ],
            "default": null,
            "doc": "Multicast Data Client Device Id. Usually unique MAC address"
        },
        {
            "name": "zoneId",
            "type": [
                "null",
                "string"
            ],
            "default": null,
            "doc": "Zone id where device belongs to"
        }
    ]
}

Encoder - C++

Component_DeviceInfo deviceInfo;
    deviceInfo.deviceId.set_string("device1");
    deviceInfo.zoneId.set_string("zone1");
    std::vector <char>tele_bytes_;
    std::auto_ptr<avro::OutputStream> out = avro::memoryOutputStream(1);
    avro::EncoderPtr enc = avro::binaryEncoder();
    enc->init(*out);
    avro::encode(*enc, deviceInfo);
    out->flush();

    size_t byte_count = out->byteCount();
    DBG("BYTE COUNT " << byte_count);

    std::auto_ptr<avro::InputStream> in = avro::memoryInputStream(*out);
    avro::StreamReader reader(*in);
    std::vector<uint8_t> row_data(byte_count);
    reader.readBytes(&row_data[0], byte_count);

Java Decoder

@Override
    public Object deserializeByteArr(Schema schema, final byte[] data){
        DatumReader<GenericRecord> genericDatumReader = new SpecificDatumReader<>(schema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
        try {
            GenericRecord userData = genericDatumReader.read(null, decoder);
            System.out.println(userData);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

## Scenario -2 - Does not work

Note that I have updated the schema and regenerated c++ files corresponding to the schema

test.avsc

[
    {
        "namespace": "com.company.project",
        "name": "Component_DeviceInfo",
        "type": "record",
        "doc": "Identifies a client device",
        "fields": [
            {
                "name": "deviceId",
                "type": [
                    "null",
                    "string"
                ],
                "default": null,
                "doc": "Unique MAC address"
            },
            {
                "name": "zoneId",
                "type": [
                    "null",
                    "string"
                ],
                "default": null,
                "doc": "Zone id where Client device belongs to"
            }
        ]
    },
    {
        "namespace": "com.company.project",
        "name": "Component_EventList",
        "type": "record",
        "doc": "Component Event list",
        "fields": [
            {
                "name": "deviceInfo",
                "type": [
                    "null",
                    "com.company.project.Component_DeviceInfo"
                ],
                "default": null,
                "doc": "Device information such as device id and zone id"
            }
        ]
    }
]

Encoder - C++

 Component_DeviceInfo deviceInfo;
    deviceInfo.deviceId.set_string("device1");
    deviceInfo.zoneId.set_string("zone1");

    std::vector <char>tele_bytes_;

    Component_EventList ComponentEventList;
    ComponentEventList.deviceInfo.set_Component_DeviceInfo(deviceInfo);

    std::auto_ptr<avro::OutputStream> out = avro::memoryOutputStream(1);
    avro::EncoderPtr enc = avro::binaryEncoder();
    enc->init(*out);
    avro::encode(*enc, ComponentEventList);
    out->flush();

    size_t byte_count = out->byteCount();
    DBG("BYTE COUNT " << byte_count);

    std::auto_ptr<avro::InputStream> in = avro::memoryInputStream(*out);
    avro::StreamReader reader(*in);
    std::vector<uint8_t> row_data(byte_count);
    reader.readBytes(&row_data[0], byte_count);

output

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.company.telemetry.services.consumer.TelemetryConsumerService.consume(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, byte[]>)' threw exception; nested exception is java.lang.ArrayIndexOutOfBoundsException: 7
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-kafka-1.1.6.RELEASE.jar:na]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.6.RELEASE.jar:na]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.6.RELEASE.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.1.6.RELEASE.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.1.6.RELEASE.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:570) [spring-kafka-1.1.6.RELEASE.jar:na]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_91]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_91]
	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Caused by: java.lang.ArrayIndexOutOfBoundsException: 7
	at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) ~[avro-1.7.7.jar:1.7.7]
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) ~[avro-1.7.7.jar:1.7.7]
	at com.company.telemetry.services.serde.AvroByteArrDeserializer.deserializeByteArr(AvroByteArrDeserializer.java:32) ~[classes/:na]
	at com.company.telemetry.services.TelemetryService.handleByteArr(TelemetryService.java:59) ~[classes/:na]
	at com.company.telemetry.services.consumer.TelemetryConsumerService.consume(TelemetryConsumerService.java:39) ~[classes/:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_91]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_91]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_91]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180) ~[spring-messaging-4.3.11.RELEASE.jar:4.3.11.RELEASE]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112) ~[spring-messaging-4.3.11.RELEASE.jar:4.3.11.RELEASE]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.6.RELEASE.jar:na]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:174) ~[spring-kafka-1.1.6.RELEASE.jar:na]
	... 8 common frames omitted

Appreciate your help ! Let me know if you need further information.

karthik-arris avatar Oct 06 '17 21:10 karthik-arris

I don't think that stacktrace is correct for the original code given (props.put("value.deserializer", KafkaAvroDeserializer).

You have a custom Avro deserializer in here at com.company.telemetry.services.serde.AvroByteArrDeserializer

Secondly, the C++ code doesn't adhere to the Confluent Wire Format for registry support.

OneCricketeer avatar Feb 08 '22 23:02 OneCricketeer