confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

DotNet - Confluent.SchemaRegistry.Serdes.AvroDeserializer throws "Confluent.Kafka.ConsumeException"exception.

Open ghost opened this issue 5 years ago • 22 comments

Description

When creating a schema (which is registered into SchemaRegistry) with an array of sub-schema, and using a C# key word in the schema namespace, we are unable to deserialize the Kafka message using AvroDeserializer<T>(...)

The producer uses AvroSerializer<T>(...) successfully to publish message.

When consuming, using Confluent.SchemaRegistry.Serdes.AvroDeserializer<T> we receive exception: Confluent.Kafka.ConsumeException: Local: Value deserialization error ---> Avro.AvroException: Unable to find type [email protected] in all loaded assemblies in field SubObjects.

How to reproduce

Create a schema, ensuring the namespace contains the word "event" (a C# key word). Schema example: { "type": "record", "name": "NewConstructionAddressEvent", "namespace": "com.company.sub.event", "doc": "@author: Smith, @description: Avro Schema for an address", "fields": [ { "name": "eventId", "type": { "type": "string", "avro.java.string": "String" }, "doc": "@required: true, @description: unique id (UUID version 4 and variant 2) for an event, @examples: d15f36fe-ab1e-4d5c-9a04-a1827ac0c330" }, { "name": "eventType", "type": { "type": "string", "avro.java.string": "String" }, "doc": "@required: true, @description: operation type for event, @examples: created|updated|deleted" }, { "name": "constructionAddressId", "type": { "type": "string", "avro.java.string": "String" }, "doc": "@required: true, @description: unique nds id for a construction address object, @examples: 35051923" }, { "name": "units", "type": [ "null", { "type": "array", "items": { "type": "record", "name": "Unit", "fields": [ { "name": "unitNumber", "type": [ "null", { "type": "string", "avro.java.string": "String" } ], "doc": "@required: false, @description: a specific unit number for an individual unit within a multi-dwelling unit, @examples: 1|101", "default": null }, { "name": "type", "type": [ "null", { "type": "string", "avro.java.string": "String" } ], "doc": "@required: false, @description: the type of the unit, @examples: Apartment|Building", "default": null }, { "name": "story", "type": [ "null", { "type": "string", "avro.java.string": "String" } ], "doc": "@required: false, @description: the story or floor number for the unit, @examples: 1|2|3", "default": null }, { "name": "fiberCount", "type": [ "null", { "type": "string", "avro.java.string": "String" } ], "doc": "@required: false, @description: the number of fibers available at the unit, @examples: 1|4", "default": null } ] } } ], "doc": "@required: false, @description: unit numbers will be available for multi-dwelling unit - demand points, @examples: unit number details", "default": null }, { "name": "constructionIndicator", "type": { "type": "string", "avro.java.string": "String" }, "doc": "@required: true, @description: construction stages (yes means in construction stage and no means in completed stage), @examples: yes|no" } ] }

Generate the associated C# code files using avrogen.

Now, execute consumer code(after publishing):

    public void Consume()
    {
        var producerConfig = new ProducerConfig
        {
            BootstrapServers = bootstrapServers
        };

        var schemaRegistryConfig = new SchemaRegistryConfig
        {
            SchemaRegistryUrl = schemaRegistryUrl,
            // optional schema registry client properties:
            SchemaRegistryRequestTimeoutMs = schemaRegistryRequestTimeoutMs,
            SchemaRegistryMaxCachedSchemas = schemaRegistryMaxCachedSchemas
        };

        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = bootstrapServers,
            AutoOffsetReset = AutoOffsetReset.Latest,
            GroupId = groupID // "Test" //Guid.NewGuid().ToString()
        };

        var avroSerializerConfig = new AvroSerializerConfig
        {
            // optional Avro serializer properties:
            BufferBytes = bufferBytes,
            AutoRegisterSchemas = autoRegisterSchema
        };

        NewConstructionAddressEvent addr = new NewConstructionAddressEvent();

        using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
        using (var consumer =
            new ConsumerBuilder<string, NewConstructionAddressEvent>(consumerConfig)
                .SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
                .SetValueDeserializer(new AvroDeserializer<NewConstructionAddressEvent>(schemaRegistry).AsSyncOverAsync())
                .SetErrorHandler((_, e) => logger.Error($"Error: {e.Reason}"))
                .Build())
        {
            try
            {
                logger.Info($"Starting consumer.subscribe.");

                consumer.Subscribe(topicName);

                while (true)
                {
                    try
                    {
                        logger.Info($"Starting: consumer.Consume");
                        var consumeResult = consumer.Consume(Executor.ApplicationCancelToken.Token);

                        string k = consumeResult.Key;
                        logger.Info($"BusMessage: {consumeResult.Message}, constructionAddressId: {consumeResult.Value.constructionAddressId}");
                    }
                    catch (OperationCanceledException)
                    {
                        logger.Info($"OperationCancelled for consumer.Consume");
                        break;
                    }
                    catch (ConsumeException e)
                    {
                        logger.Error(e, $"Consume error: {e.Error.Reason}");
                        break;
                    }
                }
            }
            catch (Exception ex)
            {
                logger.Error(ex, $"Consume error: {ex.Message}");
            }
            finally
            {
                consumer.Close();
            }
        }
    }

Consumer code can read Kafka message as "GenericRecord" successfully, but using SpecificRecord (as specified in code snippit above) :

            new ConsumerBuilder<string, NewConstructionAddressEvent>(consumerConfig)
                .SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
                .SetValueDeserializer(new AvroDeserializer<NewConstructionAddressEvent>(schemaRegistry).AsSyncOverAsync())
                .SetErrorHandler((_, e) => logger.Error($"Error: {e.Reason}"))
                .Build()

Will result in Exception: Exception detail: Confluent.Kafka.ConsumeException: Local: Value deserialization error ---> Avro.AvroException: Unable to find type [email protected] all loaded assemblies in field emails\r\n at Avro.Specific.SpecificDefaultReader.ReadRecord(Object reuse, RecordSchema writerSchema, Schema readerSchema, Decoder dec)\r\n at Avro.Generic.DefaultReader.Read[T](T reuse, Decoder decoder)\r\n at Confluent.SchemaRegistry.Serdes.SpecificDeserializerImpl1.Deserialize(String topic, Byte[] array)\r\n at Confluent.SchemaRegistry.Serdes.AvroDeserializer1.DeserializeAsync(ReadOnlyMemory1 data, Boolean isNull, SerializationContext context)\r\n at Confluent.Kafka.SyncOverAsync.SyncOverAsyncDeserializer1.Deserialize(ReadOnlySpan1 data, Boolean isNull, SerializationContext context)\r\n at Confluent.Kafka.Consumer2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer1 keyDeserializer, IDeserializer1 valueDeserializer)\r\n --- End of inner exception stack trace ---\r\n at Confluent.Kafka.Con sumer2.ConsumeImpl[K,V](Int32 millisecondsTimeout, IDeserializer1 keyDeserializer, IDeserializer1 valueDeserializer)\r\n at Confluent.Kafka.Consumer2.Consume(CancellationToken cancellationToken)

I believe that for DotNet C#, if a schema namespace contains a C# key word (EG: event), then the AvroDeserializer<T> fails.

NuGet Versions: Confluent.Kafka = 1.1.0 Confluent.Kafka.Avro = 0.11.6 Confluent.SchemaRegistry = 1.1.0 Confluent.SchemaRegistry.Serdes = 1.1.0

Operating system/Client configuration: Windows - DotNetCore 2.2, C#

I think the key issue for replication of this issue, is to create an Avro schema that includes the word "event" in the namespace, and that the schema includes a "sub-schema" of an array of objects - Please see the "Schema example" I provided above. Then create a Producer(using AvroSerializer) and a Consumer(using AvroDeserializer).

ghost avatar Aug 19 '19 02:08 ghost

I haven't had a chance to look closely, but suspect this is an issue with the avro library. thanks for reporting.

mhowlett avatar Aug 19 '19 15:08 mhowlett

I created a console application that produces and consumes using the schema provided in the issue. I used the same client library version, dotnet version, and run the app in Windows. Everything works fine and I think it is not a problem with Avro library. You can run the sample app easily:

https://github.com/Mousavi310/confluent-kafka-dotnet-issues/tree/master/Issue1034

You just need to run following command (in Issue1034 directory):

dotnet run

The sample code uses AutoOffsetReset.Earliest and is located in KafkaHostedService file. I didn't use Confluent.Kafka.Avro NuGet and as change log has noted, Confluent.Kafka.Avro is renamed to Confluent.SchemaRegistry.Serdes.

Mousavi310 avatar Oct 10 '19 11:10 Mousavi310

It is completely unclear to me how to replace Confluent.Kafka.Avro with Confluent.SchemaRegistry.Serdes. There is a mismatch, when trying to assign value deserializer.

Argument 1: cannot convert from 'Confluent.SchemaRegistry.Serdes.AvroDeserializer<*>' to 'Confluent.Kafka.IDeserializer<*>'

Rolice avatar Nov 25 '19 10:11 Rolice

Confluent.SchemaRegistry.Serdes is for use with API v1.0 and above only. Confluent.Kafka.Avro is for use with the 0.11.x API. We highly recommend making the switch to the new API as 0.11.x is now a long way behind in terms of bug fixes / robustness / features.

mhowlett avatar Nov 25 '19 21:11 mhowlett

Thanks for the information!

What I do not understand is the versioning between the two available sets. Does ConsumerBuilder come from 0.11.x? Is there a replacement?

I am experimenting now with .NET core and Kafka.

What I am trying to do now is something like:

.SetValueDeserializer(new Confluent.SchemaRegistry.Serdes.AvroDeserializer<OrderEvent<Order>>(schemaConfig))

The method SetValueDeserializer expects Confluent.Kafka.IDeserializer. Maybe I am doing something wrong, but I am unable to find up-to-date documentation and examples (especially about the transition from 0.11.x to 1.0.0).

Rolice avatar Nov 26 '19 14:11 Rolice

Hello @mhowlett We are also facing this problem with all latest version libraries.

Keywords like event in namespace is causing deserialization problem.

codebased avatar Jul 02 '20 23:07 codebased

Hi @mhowlett Is there any update on this issue? I am facing similar issue when there is c# keyword (event) in the namespace. I am using latest version of libraries.

vkhose avatar Jul 03 '20 00:07 vkhose

Hi @codebased & @vkhose - I'm interested that you both report the same namespace issue, that is: using a keyword like "event" in the schema namespace. This is the first I have heard that other parties have experienced the same issue. As you can see from the above chain, @Mousavi310 confirmed in a response to me, that confluent was UNABLE to reproduce the issue I originally submitted. I therefore assumed it must have been something my end - I never had time to further investigate (EG: I was wondering if our Kafka installation version was something to look into, since that was the only other variable - we agreed on our DotNetCore and Confluent client versions when he tried to reproduce the issue). Due to other pressures, I made no further investigations, and my solution was to give up on using the keyword "event" in our Kafka schema's namespace. Sorry this is not more helpful - all I would suggest is you both provide the similar details I provided in this issue: Confluent folks will at least have additional data/sample to review in trying to identify this issue.

paulctl avatar Jul 03 '20 00:07 paulctl

thanks @paulctl . I am using following libraries for writing my consumer - Confluent.Kafka v 1.4.3 Confluent.SchemaRegistry v 1.4.3 Confluent.SchemaRegistry.Serdes.Avro v 1.4.3

I am using avrogen tool (v 1.9.0.0) to generate c# files using avro schema. When my files are generated it has namespace like - au.com.example.company.@event. As "event" is a c# keyword it wraps is around @.

Deserilizer is configured as - .SetValueDeserializer(new AvroDeserializer<TValue>(schemaRegistry).AsSyncOverAsync());

when I try to consume events with this setup it is giving me exactly same error as mentioned above.

P.S. This happens only when I have an array of complex objects in my message

vkhose avatar Jul 03 '20 00:07 vkhose

This issue is very easily reproducible by modifying one of Deserialization unit test case and provide a complex object with array of another complex type as child element. Both Parent and child class with namespace @event.

Sample code for test case and avro class https://github.com/vkhose/samplecode/tree/master

Looks like same util methods are used to generate code for specific record and read record during deserialization. https://github.com/apache/avro/blob/c0094b5bb1abb79304ce42a56cc115186370d407/lang/csharp/src/apache/main/CodeGen/CodeGenUtil.cs#L99

if (ReservedKeywords.Contains(names[i])) builder.Append(At); Appending At(@) while generating code makes sense but it should not appended while deserializing. Deserialization fails as c# runtime does not expects At(@) in string value provided to create class instance. https://github.com/apache/avro/blob/master/lang/csharp/src/apache/main/Specific/ObjectCreator.cs#L165

Type.GetType(string) string parameter passed should be without "@" character in namespace name.

vkhose avatar Jul 03 '20 12:07 vkhose

I created a console application that produces and consumes using the schema provided in the issue. I used the same client library version, dotnet version, and run the app in Windows. Everything works fine and I think it is not a problem with Avro library. You can run the sample app easily:

https://github.com/Mousavi310/confluent-kafka-dotnet-issues/tree/master/Issue1034

You just need to run following command (in Issue1034 directory):

dotnet run

The sample code uses AutoOffsetReset.Earliest and is located in KafkaHostedService file. I didn't use Confluent.Kafka.Avro NuGet and as change log has noted, Confluent.Kafka.Avro is renamed to Confluent.SchemaRegistry.Serdes.

Your example works fine. But if you update the nuget packages to latest v1.5.0, you start getting the "Local: Key deserialization error" upon consume operation. I have tested this in your example code.

gaurravprakash avatar Aug 05 '20 20:08 gaurravprakash

Same issue for 1.7.0. Is there any plan to fix this defect?

leszek95b avatar Jun 29 '21 08:06 leszek95b

This is a bug in the Apache.Avro project. I would recommend using protobuf or json which both have reliable implementations in .net. Or you could try this alternative Avro implementation: https://github.com/ch-robinson/dotnet-avro. We do not link to this library because it doesn't have a community, but it was built out of frustration with the quality of the Apache.Avro implementation, so there's a good chance it is better.

mhowlett avatar Jun 29 '21 13:06 mhowlett

Thanks @mhowlett . I created a jira ticket on Apache.Avro long time back but doesn't look like it is being fixed. I had a workaround for my implementation last time. But I will try your suggestions too.

vkhose avatar Jun 30 '21 01:06 vkhose

Hi @vkhose, can you please confirm the following is the issue raised in Apache.Avro - https://issues.apache.org/jira/browse/AVRO-2888 Just to be able to keep an eye on it. Thanks

camtechnet avatar Aug 03 '21 07:08 camtechnet

Hi @vkhose, can you please confirm the following is the issue raised in Apache.Avro - https://issues.apache.org/jira/browse/AVRO-2888 Just to be able to keep an eye on it. Thanks

Yes. This is the one.

vkhose avatar Aug 03 '21 10:08 vkhose

Thanks @mhowlett . I created a jira ticket on Apache.Avro long time back but doesn't look like it is being fixed. I had a workaround for my implementation last time. But I will try your suggestions too.

Can you describe your workaround implementation, please?

razvanreb avatar Feb 09 '22 08:02 razvanreb

@razvanreb - the workaround was not very good and involved manually updating autogenerated classes. I would not recommend to go for any workaround as this issue is now fixed in apache.avro library (v 1.11.0) . More details here - https://issues.apache.org/jira/browse/AVRO-2888 and https://issues.apache.org/jira/browse/AVRO-3075 apache.avro version update is pending release for this repo (already updated in master). It will be available with Confluent.SchemaRegistry.Serdes.Avro soon.

vkhose avatar Mar 01 '22 05:03 vkhose

The problem is caused by this code in Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs, line 146:

return datumReader.Read(default(T), new BinaryDecoder(stream));

Even the Deserializer class knows exactly what type it needs to deserialize to, it always passes default(T), which is always null for .NET object types to datumReader. This causes the DatumReader to create new instance using the Namespace and Class Name from the schema.

The solution is to use Activator.CreateInstance<T> instead of default(T) for classes that implement ISpecificRecord interface.

BTW, the Azure SDK that also uses Avro library, has this implemented correctly.

@mhowlett Let me know if you want me to create a PR for this code change.

sergemat avatar Jun 20 '22 21:06 sergemat

@sergemat - yep, sounds good. can you include a test case? we can target 1.9.1, which will probably be coming out relatively soon.

mhowlett avatar Jun 21 '22 12:06 mhowlett

@mhowlett OK. Please review PR #1847

sergemat avatar Jun 21 '22 21:06 sergemat