confluent-kafka-dotnet
confluent-kafka-dotnet copied to clipboard
Schema with oneOf fails to validate
Description
I have a schema to support a Kafka message that makes use of polymorphism. However, the schema always fails to validate, giving the error below, despite validating fine at https://www.jsonschemavalidator.net/ . Am I doing something wrong?
Exception thrown: 'Confluent.Kafka.ProduceException`2' in System.Private.CoreLib.dll: 'Local: Value serialization error'
Inner exceptions found, see $exception in variables window for more details.
Innermost exception System.IO.InvalidDataException : Schema validation failed for properties: [#/Payload.Message]
at Confluent.SchemaRegistry.Serdes.JsonSerializer`1.<SerializeAsync>d__18.MoveNext()
at Confluent.Kafka.SyncOverAsync.SyncOverAsyncSerializer`1.Serialize(T data, SerializationContext context)
at Confluent.Kafka.Producer`2.<ProduceAsync>d__57.MoveNext()
My serializer config is:
builder.SetValueSerializer(
new JsonSerializer<OutputMessage>(
new CachedSchemaRegistryClient(
new SchemaRegistryConfig
{
Url = "localhost:8081"
}
),
new JsonSerializerConfig {
AutoRegisterSchemas = false,
UseLatestVersion = true,
SubjectNameStrategy = SubjectNameStrategy.TopicRecord
}
).AsSyncOverAsync()
);
The schema and classes are as follows:
Schema
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "OutputMessage",
"type": "object",
"additionalProperties": false,
"required": [
"Payload"
],
"properties": {
"Payload": {
"oneOf": [
{
"$ref": "#/definitions/AllTheThings"
},
{
"$ref": "#/definitions/AllTheThingsKafkaMessagePayloadA"
},
{
"$ref": "#/definitions/AllTheThingsOutputPayload"
}
]
}
},
"definitions": {
"AllTheThings": {
"type": "object",
"additionalProperties": false,
"required": [
"MessageType"
],
"properties": {
"MessageType": {
"type": "string",
"minLength": 1
}
}
},
"AllTheThingsKafkaMessagePayloadA": {
"title": "AllTheThingsKafkaMessagePayloadA",
"type": "object",
"additionalProperties": false,
"required": [
"MessageType",
"Message"
],
"properties": {
"MessageType": {
"type": "string",
"minLength": 1
},
"Message": {
"type": "string",
"minLength": 1
}
}
},
"AllTheThingsOutputPayload": {
"title": "AllTheThingsOutputPayload",
"type": "object",
"additionalProperties": false,
"required": [
"MessageType",
"Wombats"
],
"properties": {
"MessageType": {
"type": "string",
"minLength": 1
},
"Hairyness": {
"type": [
"null",
"number"
],
"format": "double",
"maximum": 100.0
},
"Wombats": {
"type": "integer",
"format": "int32",
"maximum": 2147483647.0,
"minimum": 5.0
},
"SomeText": {
"type": [
"null",
"string"
]
}
}
}
}
}
Classes
[JsonPolymorphic(TypeDiscriminatorPropertyName = "MessageType")]
[JsonDerivedType(typeof(AllTheThingsKafkaMessagePayloadA), typeDiscriminator: "KafkaMessagePayloadA")]
[JsonDerivedType(typeof(AllTheThingsOutputPayload), typeDiscriminator: "OutputPayload")]
public class AllTheThings : IEquatable<AllTheThings>
{
[Required]
public string MessageType { get; set; }
public bool Equals(AllTheThings? other)
{
if(other == null)
{
return false;
}
return
other != null &&
MessageType.Equals(other.MessageType)
;
}
}
public class AllTheThingsKafkaMessagePayloadA : AllTheThings, IEquatable<AllTheThingsKafkaMessagePayloadA>
{
/**
* Any old text
*/
[Required]
public string Message { get; set; }
public bool Equals(AllTheThingsKafkaMessagePayloadA? other)
{
if(other == null)
{
return false;
}
return
other != null &&
base.Equals(other) &&
Message.Equals(other.Message)
;
}
}
public class AllTheThingsOutputPayload : AllTheThings, IEquatable<AllTheThingsOutputPayload>
{
/**
* Amount of hair as a percentage
*/
[Range(Double.MinValue, 100)]
public Double? Hairyness { get; set; }
/**
* Number of wombats
*/
[Required]
[Range(5, Int32.MaxValue)]
public Int32 Wombats { get; set; }
public string? SomeText { get; set; }
public bool Equals(AllTheThingsOutputPayload? other)
{
if(other == null)
{
return false;
}
return
other != null &&
base.Equals(other) &&
((Hairyness == null && other.Hairyness == null) || (Hairyness != null && Hairyness.Equals(other.Hairyness))) &&
Wombats.Equals(other.Wombats)&&
((SomeText == null && other.SomeText == null) || (SomeText != null && SomeText.Equals(other.SomeText)))
;
}
}
How to reproduce
Register the above schema and attempt to produce to the relevant topic with an instance of OutputMessage
as the value
Checklist
Please provide the following information:
- [ ] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
- [ ] Confluent.Kafka nuget version.
- [ ] Apache Kafka version.
- [ ] Client configuration.
- [ ] Operating system.
- [ ] Provide logs (with "debug" : "..." as necessary in configuration).
- [ ] Provide broker log excerpts.
- [ ] Critical issue.
Worked around this by implementing an ISchemaProcessor
@AntonyLittle, can you share how did you resolve it using ISchemaProcessor? We are stuck on a similar issue. Thank you
I'm not sure if I am permitted to share code with you, but if I can, I will. For now, here is a high level description of how I did it:
- Implement
ISchemaProcessor
interface in a class calledOneOfProcessor
. - Add an instance of
OneOfProcessor
to theSchemaProcessors
member of anJsonSchemaGeneratorSettings
instance. - Pass your
JsonSchemaGeneratorSettings
instance to both your schema generation code and your Producer configuration.
ISchemaProcessor
@AntonyLittle I understand not being able to share code, I am bound by that limitation myself. Can you tell me though what the implementation of the OneOfProcessor.Process() method is doing?
I was also able to work around my problem, which is a bit different than yours, by setting JsonSchemaGeneratorSettings.FlattenInheritanceHierarchy to true.
The OneOfProcessor
searches for polymorphic types and adds them as OneOf values :)