confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Schema with oneOf fails to validate

Open AntonyLittle opened this issue 1 year ago • 5 comments

Description

I have a schema to support a Kafka message that makes use of polymorphism. However, the schema always fails to validate, giving the error below, despite validating fine at https://www.jsonschemavalidator.net/ . Am I doing something wrong?

Exception thrown: 'Confluent.Kafka.ProduceException`2' in System.Private.CoreLib.dll: 'Local: Value serialization error'
 Inner exceptions found, see $exception in variables window for more details.
 Innermost exception 	 System.IO.InvalidDataException : Schema validation failed for properties: [#/Payload.Message]
   at Confluent.SchemaRegistry.Serdes.JsonSerializer`1.<SerializeAsync>d__18.MoveNext()
   at Confluent.Kafka.SyncOverAsync.SyncOverAsyncSerializer`1.Serialize(T data, SerializationContext context)
   at Confluent.Kafka.Producer`2.<ProduceAsync>d__57.MoveNext()

My serializer config is:

        builder.SetValueSerializer(
            new JsonSerializer<OutputMessage>(
                new CachedSchemaRegistryClient(
                    new SchemaRegistryConfig
                    {
                        Url = "localhost:8081"
                    }
                ), 
                new JsonSerializerConfig { 
                    AutoRegisterSchemas = false, 
                    UseLatestVersion = true,
                    SubjectNameStrategy = SubjectNameStrategy.TopicRecord
                }
            ).AsSyncOverAsync()
        );

The schema and classes are as follows:

Schema

{
    "$schema": "http://json-schema.org/draft-04/schema#",
    "title": "OutputMessage",
    "type": "object",
    "additionalProperties": false,
    "required": [
        "Payload"
    ],
    "properties": {
        "Payload": {
            "oneOf": [
                {
                    "$ref": "#/definitions/AllTheThings"
                },
                {
                    "$ref": "#/definitions/AllTheThingsKafkaMessagePayloadA"
                },
                {
                    "$ref": "#/definitions/AllTheThingsOutputPayload"
                }
            ]
        }
    },
    "definitions": {
        "AllTheThings": {
            "type": "object",
            "additionalProperties": false,
            "required": [
                "MessageType"
            ],
            "properties": {
                "MessageType": {
                    "type": "string",
                    "minLength": 1
                }
            }
        },
        "AllTheThingsKafkaMessagePayloadA": {
            "title": "AllTheThingsKafkaMessagePayloadA",
            "type": "object",
            "additionalProperties": false,
            "required": [
                "MessageType",
                "Message"
            ],
            "properties": {
                "MessageType": {
                    "type": "string",
                    "minLength": 1
                },
                "Message": {
                    "type": "string",
                    "minLength": 1
                }
            }
        },
        "AllTheThingsOutputPayload": {
            "title": "AllTheThingsOutputPayload",
            "type": "object",
            "additionalProperties": false,
            "required": [
                "MessageType",
                "Wombats"
            ],
            "properties": {
                "MessageType": {
                    "type": "string",
                    "minLength": 1
                },
                "Hairyness": {
                    "type": [
                        "null",
                        "number"
                    ],
                    "format": "double",
                    "maximum": 100.0
                },
                "Wombats": {
                    "type": "integer",
                    "format": "int32",
                    "maximum": 2147483647.0,
                    "minimum": 5.0
                },
                "SomeText": {
                    "type": [
                        "null",
                        "string"
                    ]
                }
            }
        }
    }
}

Classes


[JsonPolymorphic(TypeDiscriminatorPropertyName = "MessageType")]
[JsonDerivedType(typeof(AllTheThingsKafkaMessagePayloadA), typeDiscriminator: "KafkaMessagePayloadA")]
[JsonDerivedType(typeof(AllTheThingsOutputPayload), typeDiscriminator: "OutputPayload")]
public class AllTheThings : IEquatable<AllTheThings>
{
    [Required]
    public string MessageType { get; set; }
    
    public bool Equals(AllTheThings? other)
    {
        if(other == null)
        {
            return false;
        }

        return
            other != null &&
            MessageType.Equals(other.MessageType)
            ; 
    }
}

    
public class AllTheThingsKafkaMessagePayloadA : AllTheThings, IEquatable<AllTheThingsKafkaMessagePayloadA>
{
    /**
     * Any old text
     */
    [Required]
    public string Message { get; set; }
    
    
    public bool Equals(AllTheThingsKafkaMessagePayloadA? other)
    {
        if(other == null)
        {
            return false;
        }

        return
            other != null &&
            base.Equals(other) &&
            Message.Equals(other.Message)
            ; 
    }
}

    
public class AllTheThingsOutputPayload : AllTheThings, IEquatable<AllTheThingsOutputPayload>
{
    /**
     * Amount of hair as a percentage
     */
    [Range(Double.MinValue, 100)]
    public Double? Hairyness { get; set; }
    
    /**
     * Number of wombats
     */
    [Required]
    [Range(5, Int32.MaxValue)]
    public Int32 Wombats { get; set; }
    
    public string? SomeText { get; set; }
    
    
    public bool Equals(AllTheThingsOutputPayload? other)
    {
        if(other == null)
        {
            return false;
        }

        return
            other != null &&
            base.Equals(other) &&
            ((Hairyness == null && other.Hairyness == null) || (Hairyness != null && Hairyness.Equals(other.Hairyness))) &&
            Wombats.Equals(other.Wombats)&&
            ((SomeText == null && other.SomeText == null) || (SomeText != null && SomeText.Equals(other.SomeText)))
            ; 
    }
}

How to reproduce

Register the above schema and attempt to produce to the relevant topic with an instance of OutputMessage as the value

Checklist

Please provide the following information:

  • [ ] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [ ] Confluent.Kafka nuget version.
  • [ ] Apache Kafka version.
  • [ ] Client configuration.
  • [ ] Operating system.
  • [ ] Provide logs (with "debug" : "..." as necessary in configuration).
  • [ ] Provide broker log excerpts.
  • [ ] Critical issue.

AntonyLittle avatar Jul 31 '23 09:07 AntonyLittle

Worked around this by implementing an ISchemaProcessor

AntonyLittle avatar Aug 07 '23 15:08 AntonyLittle

@AntonyLittle, can you share how did you resolve it using ISchemaProcessor? We are stuck on a similar issue. Thank you

prabhpahul avatar Aug 07 '23 15:08 prabhpahul

I'm not sure if I am permitted to share code with you, but if I can, I will. For now, here is a high level description of how I did it:

  1. Implement ISchemaProcessor interface in a class called OneOfProcessor.
  2. Add an instance of OneOfProcessor to the SchemaProcessors member of an JsonSchemaGeneratorSettings instance.
  3. Pass your JsonSchemaGeneratorSettings instance to both your schema generation code and your Producer configuration.

AntonyLittle avatar Aug 07 '23 15:08 AntonyLittle

  1. ISchemaProcessor

@AntonyLittle I understand not being able to share code, I am bound by that limitation myself. Can you tell me though what the implementation of the OneOfProcessor.Process() method is doing?

I was also able to work around my problem, which is a bit different than yours, by setting JsonSchemaGeneratorSettings.FlattenInheritanceHierarchy to true.

bobcat1506 avatar Oct 26 '23 17:10 bobcat1506

The OneOfProcessor searches for polymorphic types and adds them as OneOf values :)

AntonyLittle avatar Oct 26 '23 17:10 AntonyLittle