schema-registry
schema-registry copied to clipboard
Avro deserialization success but cannot print the message
I have a legacy C++ based system which spits out binary encoded Avro data that supports confluent Avro schema registry format. In my Java application, I successfully deserialized the message using KafkaAvroDeserializer class but could not print out the message.
` private void consumeAvroData(){ String group = "group1"; Properties props = new Properties(); props.put("bootstrap.servers", "http://1.2.3.4:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", LongDeserializer.class.getName()); props.put("value.deserializer", KafkaAvroDeserializer.class.getName()); // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false"); props.put("schema.registry.url","http://1.2.3.4:8081"); KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
System.out.println("Subscribed to topic " + TOPIC_NAME);
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records)
{
System.out.printf("value = %s\n",record.value());
}
}
} ` The output I see is
{"value":"�"}
Any help appreciated !
@karthik-arris would be happy to help you on this but it would be great if you could provide some examples of the records in the topic.
Hi @mageshn Thanks for responding.
## Scenario -1 - Works
My workflow is - I create avsc files, generate C++ classes using avrogencpp tool and create avro binary encoded data in my C++ application.
test.avsc
{
"namespace": "com.company.project",
"name": "Component_DeviceInfo",
"type": "record",
"doc": "Identifies a client device",
"fields": [
{
"name": "deviceId",
"type": [
"null",
"string"
],
"default": null,
"doc": "Multicast Data Client Device Id. Usually unique MAC address"
},
{
"name": "zoneId",
"type": [
"null",
"string"
],
"default": null,
"doc": "Zone id where device belongs to"
}
]
}
Encoder - C++
Component_DeviceInfo deviceInfo;
deviceInfo.deviceId.set_string("device1");
deviceInfo.zoneId.set_string("zone1");
std::vector <char>tele_bytes_;
std::auto_ptr<avro::OutputStream> out = avro::memoryOutputStream(1);
avro::EncoderPtr enc = avro::binaryEncoder();
enc->init(*out);
avro::encode(*enc, deviceInfo);
out->flush();
size_t byte_count = out->byteCount();
DBG("BYTE COUNT " << byte_count);
std::auto_ptr<avro::InputStream> in = avro::memoryInputStream(*out);
avro::StreamReader reader(*in);
std::vector<uint8_t> row_data(byte_count);
reader.readBytes(&row_data[0], byte_count);
Java Decoder
@Override
public Object deserializeByteArr(Schema schema, final byte[] data){
DatumReader<GenericRecord> genericDatumReader = new SpecificDatumReader<>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
try {
GenericRecord userData = genericDatumReader.read(null, decoder);
System.out.println(userData);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
## Scenario -2 - Does not work
Note that I have updated the schema and regenerated c++ files corresponding to the schema
test.avsc
[
{
"namespace": "com.company.project",
"name": "Component_DeviceInfo",
"type": "record",
"doc": "Identifies a client device",
"fields": [
{
"name": "deviceId",
"type": [
"null",
"string"
],
"default": null,
"doc": "Unique MAC address"
},
{
"name": "zoneId",
"type": [
"null",
"string"
],
"default": null,
"doc": "Zone id where Client device belongs to"
}
]
},
{
"namespace": "com.company.project",
"name": "Component_EventList",
"type": "record",
"doc": "Component Event list",
"fields": [
{
"name": "deviceInfo",
"type": [
"null",
"com.company.project.Component_DeviceInfo"
],
"default": null,
"doc": "Device information such as device id and zone id"
}
]
}
]
Encoder - C++
Component_DeviceInfo deviceInfo;
deviceInfo.deviceId.set_string("device1");
deviceInfo.zoneId.set_string("zone1");
std::vector <char>tele_bytes_;
Component_EventList ComponentEventList;
ComponentEventList.deviceInfo.set_Component_DeviceInfo(deviceInfo);
std::auto_ptr<avro::OutputStream> out = avro::memoryOutputStream(1);
avro::EncoderPtr enc = avro::binaryEncoder();
enc->init(*out);
avro::encode(*enc, ComponentEventList);
out->flush();
size_t byte_count = out->byteCount();
DBG("BYTE COUNT " << byte_count);
std::auto_ptr<avro::InputStream> in = avro::memoryInputStream(*out);
avro::StreamReader reader(*in);
std::vector<uint8_t> row_data(byte_count);
reader.readBytes(&row_data[0], byte_count);
output
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.company.telemetry.services.consumer.TelemetryConsumerService.consume(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, byte[]>)' threw exception; nested exception is java.lang.ArrayIndexOutOfBoundsException: 7
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-kafka-1.1.6.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.6.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.6.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.1.6.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.1.6.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:570) [spring-kafka-1.1.6.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_91]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
Caused by: java.lang.ArrayIndexOutOfBoundsException: 7
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) ~[avro-1.7.7.jar:1.7.7]
at com.company.telemetry.services.serde.AvroByteArrDeserializer.deserializeByteArr(AvroByteArrDeserializer.java:32) ~[classes/:na]
at com.company.telemetry.services.TelemetryService.handleByteArr(TelemetryService.java:59) ~[classes/:na]
at com.company.telemetry.services.consumer.TelemetryConsumerService.consume(TelemetryConsumerService.java:39) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_91]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_91]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_91]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:180) ~[spring-messaging-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:112) ~[spring-messaging-4.3.11.RELEASE.jar:4.3.11.RELEASE]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-1.1.6.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:174) ~[spring-kafka-1.1.6.RELEASE.jar:na]
... 8 common frames omitted
Appreciate your help ! Let me know if you need further information.
I don't think that stacktrace is correct for the original code given (props.put("value.deserializer", KafkaAvroDeserializer
).
You have a custom Avro deserializer in here at com.company.telemetry.services.serde.AvroByteArrDeserializer
Secondly, the C++ code doesn't adhere to the Confluent Wire Format for registry support.