azure-functions-kafka-extension
azure-functions-kafka-extension copied to clipboard
KafkaTrigger (.NET) does not work with Avro schema that has the has an array of records
I have an Avro schema that looks like this:
{
"type": "array",
"items": {
"type": "record",
"name": "myrecord",
"fields": [
...
]
}
}
If I try to use the Avro schema in my KafkaTrigger using either "SchemaRegistryUrl" or "AvroSchema" properties and a "events" type as KafkaEventData<GenericRecord>[]
I get the following error:
Confluent.Kafka: Local: Value deserialization error. System.Private.CoreLib: Unable to cast object of type 'System.Object[]' to type 'Avro.Generic.GenericRecord'.
Changing the events type to KafkaEventData<GenericRecord[]>[]
results in the same error.
Is it not possible to deserialize the messages using Avro with an array type? I have to use SchemaRegistryUrl as the Avro schema is managed by another team (the providers) so I can't hardcode the schema as a C# class.
@michny Are you using dotnet or dotnet-isolated model for your function? Also is the behavior similar to what is described in #452?
I am using "dotnet" for the function. As for #452 it appears different. My deserialization completely fails by throwing an exception where as the other issue seems to just gets "null" as the value of the array. They also seem different in that my avro Schema is an array of records, where as the schema mentioned in the other issue is a record with an array field. To me it seems like the issue I am hitting is that it receives an array in the message ("Object[]") which matches the schema, but the code expects the message to be a single record when it deserializes to "GenericRecord".
I have just tried recreating the setup in a Console app using Confluent.Kafka 2.2.0
, Confluent.SchemaRegistry 2.2.0
and Confluent.SchemaRegistry.Serdes.Avro
and I get the exact same error so it is probably an error from the underlying libraries rather than a problem with the KafkaTrigger itself.
But I am still looking for some way around it, so I can deserialize the messages.
@jainharsh98 are there any plans to upgrade the library to use Confluent.Kafka v2.2.0?
I have found a work-around for my issue which is to use the Chr.Avro.Confluent library (https://github.com/ch-robinson/dotnet-avro) for the deserialization, but this uses Confluent.Kafka 2.2.0 and as such clashes with the azure-functions-kafka-extension using Confluent.Kafka 1.9.0. Using both these libraries at the same time forces azure-functions-kafka-extension to use 2.2.0 which results in the following error on startup:
A host error has occurred during startup operation '7317244d-cf39-4871-a99b-2588681b02b6'.
System.Private.CoreLib: Method 'RegisterSchemaAsync' in type 'Microsoft.Azure.WebJobs.Extensions.Kafka.LocalSchemaRegistry' from assembly 'Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.9.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35' does not have an implementation.
Of course I would ideally like to just use azure-functions-kafka-extension for triggering the function and doing the deserialization, but an acceptable workaround for me would be to use azure-functions-kafka-extension to trigger the function when new messages arrive in Kafka and then use Chr.Avro.Confluent to deserialize them from a byte[] to the .NET classes I have.
@michny We have library upgrade as a part of our roadmap. Since this is a major version change any breaking changes need to be taken care of before the release. We currently do not have any ETA on this.