gogen-avro icon indicating copy to clipboard operation
gogen-avro copied to clipboard

Supporting confluent compliant schema registry

Open MrDHat opened this issue 5 years ago • 2 comments

We use schema registry to maintain our schemas & versions. It would be great to have an integration with the same.

This is what I propose the solution should look like:

  • Generator to accept 2 new arguments:
    • Schema Registry URL
    • Schema subject name in the registry
    • Schema version
  • Generated code should talk to the schema registry instead of using the schema text in the code to get the schema
  • While deserializing, it strips out confluent specific bytes (first 5) and deserializes the object.
  • Serializer prepends 5 extra bytes to make it compatible with the schema registry

I am happy to take it up and open a PR since we will be using it in-house very actively.

MrDHat avatar Apr 23 '19 10:04 MrDHat

Hi @MrDHat! This seems like a very useful change, I would make a few comments:

Generator to accept 2 new arguments:

  • Schema Registry URL
  • Schema subject name in the registry
  • Schema version

If I understand it right, the goal here is to generate a new struct for a schema you get from the schema registry (instead of reading it from a .avsc file)? That seems fine as an extra, optional set of arguments for the struct-generating code. We should include the schema subject and version into the generated code as well.

Generated code should talk to the schema registry instead of using the schema text in the code to get the schema

I don't understand why this would change? The generated struct is tied to the specific schema it was generated with, once we fetch the schema text at generation-time why would we look it up again at runtime?

While deserializing, it strips out confluent specific bytes (first 5) and deserializes the object.

I would implement this as a Confluent schema registry client that wraps the existing VM, with a method like (pseudocode):

import (
    "github.com/actgardner/gogen-avro/vm"
    "github.com/actgardner/gogen-avro/compiler"
)

// r is a stream of bytes including the Confluent framing data
// target is an instance the generated struct 
func (c *ConfluentClient) Deserialize(r io.Reader, target types.Field) (err error) {
    // Read the first 5 bytes to get the schema ID for the incoming record from r
    schemaId, err := c.readSchemaId(r)

    // Look up the schema in the schema registry - we should also probably cache these
    writerSchema, err := c.GetSchemaForId(schemaId)

    // Compile a deserializer given the Writer schema from the Confluent registry and the Reader schema used to generate the struct
    program, err := compiler.CompileBytes(writerSchema, target.Schema())

    // Evaluate the deserializer to populate the fields in `target`
    return vm.Eval(r, program, target)
}

Serializer prepends 5 extra bytes to make it compatible with the schema registry

Every generated struct has a method like func (r *<GeneratedType>) Serialize(w io.Writer) error, we should be able to create an interface with the generated methods that provide metadata for the schema registry and the serialize method. Then you could write a small method that serializes any generated record that fulfills this schema - any record generated from a schema in a given confluent registry.

actgardner avatar May 04 '19 04:05 actgardner

Are you still looking for help on this?

robhoeting-kr avatar Aug 11 '21 16:08 robhoeting-kr