gogen-avro
gogen-avro copied to clipboard
Supporting confluent compliant schema registry
We use schema registry to maintain our schemas & versions. It would be great to have an integration with the same.
This is what I propose the solution should look like:
- Generator to accept 2 new arguments:
- Schema Registry URL
- Schema subject name in the registry
- Schema version
- Generated code should talk to the schema registry instead of using the schema text in the code to get the schema
- While deserializing, it strips out confluent specific bytes (first 5) and deserializes the object.
- Serializer prepends 5 extra bytes to make it compatible with the schema registry
I am happy to take it up and open a PR since we will be using it in-house very actively.
Hi @MrDHat! This seems like a very useful change, I would make a few comments:
Generator to accept 2 new arguments:
- Schema Registry URL
- Schema subject name in the registry
- Schema version
If I understand it right, the goal here is to generate a new struct for a schema you get from the schema registry (instead of reading it from a .avsc file)? That seems fine as an extra, optional set of arguments for the struct-generating code. We should include the schema subject and version into the generated code as well.
Generated code should talk to the schema registry instead of using the schema text in the code to get the schema
I don't understand why this would change? The generated struct is tied to the specific schema it was generated with, once we fetch the schema text at generation-time why would we look it up again at runtime?
While deserializing, it strips out confluent specific bytes (first 5) and deserializes the object.
I would implement this as a Confluent schema registry client that wraps the existing VM, with a method like (pseudocode):
import (
"github.com/actgardner/gogen-avro/vm"
"github.com/actgardner/gogen-avro/compiler"
)
// r is a stream of bytes including the Confluent framing data
// target is an instance the generated struct
func (c *ConfluentClient) Deserialize(r io.Reader, target types.Field) (err error) {
// Read the first 5 bytes to get the schema ID for the incoming record from r
schemaId, err := c.readSchemaId(r)
// Look up the schema in the schema registry - we should also probably cache these
writerSchema, err := c.GetSchemaForId(schemaId)
// Compile a deserializer given the Writer schema from the Confluent registry and the Reader schema used to generate the struct
program, err := compiler.CompileBytes(writerSchema, target.Schema())
// Evaluate the deserializer to populate the fields in `target`
return vm.Eval(r, program, target)
}
Serializer prepends 5 extra bytes to make it compatible with the schema registry
Every generated struct has a method like func (r *<GeneratedType>) Serialize(w io.Writer) error
, we should be able to create an interface with the generated methods that provide metadata for the schema registry and the serialize method. Then you could write a small method that serializes any generated record that fulfills this schema - any record generated from a schema in a given confluent registry.
Are you still looking for help on this?