confluent-schema-registry icon indicating copy to clipboard operation
confluent-schema-registry copied to clipboard

Implement support for Protocol Buffer references (imports)

Open brinchj opened this issue 3 years ago • 1 comments

What:

Add support for Protocol Buffer imports and schema references.

I believe this is a feature change without any breaking changes.

Why:

Imports are very useful when writing schema files using Protocol Buffers. And in order to register a schema that uses imports, we need to use Schema Register's references feature to let it know where to find those imported types. Otherwise, it fails to register the schema.

Previous work:

I believe this relates to #82

There are two other related PRs open in #92 and #121. As far as I can tell, the first extends getSchema to retrieve references from Schema Registry. The second extends register() by asking the user to provide the reference schema ids in userOpts. This PR attempts to extend register() to automatically register needed references when registering Protocol Buffer schemas, and tries to make getSchema() create a Protocol Buffer schema instance that includes these references.

How:

When registering a new schema, the register method now makes use of a new user-provided fetchSchema helper to fetch definitions of imported schemas, as well as a new referencedSchemas SchemaHelper that this PR implements for Protocol Buffers, but which could also be implemented for at least JSON schemas.

The fetchSchema helper tells us how to fetch schema definitions, so we can get the definitions of any imported files. The user of the library decides how to implement this helper, but I guess a common pattern would be to read the proto from a location on disk.

The referencedSchemas SchemaHelper tells us which other schemas are referenced. This PR implements it for Protos and leaves it returning [] for AVRO and JSON schemas. It could be implemented for other schema formats now or later.

   referencedSchemas(schema: string): Promise<string[]>

The register method is then extended to use these helpers to get the schema references needed, use get their schema definitions, make sure those are all registered recursively and then included as the references of the newly registered schema.

Schema registry references are a list of [(Subject, Version)] where the Subject matches the imported name. For protocol buffers, the Subject is the name of the imported protocolbuffer.

https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#referenced-schemas

To get define and encode to work, I extended the ProtoSchema constructor so it can take references into account when building the schema instance. It fetches the Schema instances for the references and adds them to the Root context for the new Schema instance, so it has all the types it needs.

   constructor(schema: ConfluentSchema, opts?: ProtoOptions, references?: Schema[])

I then extended schemaFromConfluentSchema to pass through the referenced schema.

Testing

I have added tests to validate behaviour in a few cases:

  1. Register and encode protocol buffer v3 schema
  2. Register and encode protocol buffer v3 schema with import. The imported proto has an import itself, to check the recursive registration.

TODO before merging

  1. Need to check whether it is safe to reuse the argument for this.root.add() or whether it assumes ownership over its input argument and mutates it.
  2. Need to check whether we need to explicitly provide "common" imports and whether we can "cheat" and do so via the protobufjs library: https://github.com/protobufjs/protobuf.js/blob/master/tests/data/common.proto

Future work

In order to use this new functionality, we should implement support for Message Indexes. Since Protocol Buffers can contain multiple type definitions in a single schema file, each encoded Kafka message includes a Message Index that tells the consumer which Protocol Buffer Message type to use for parsing the message. I think this feature will either need to be feature flagged, or a breaking change, since it changes the binary wire format.

https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

brinchj avatar Jul 08 '21 14:07 brinchj

Tested it out, seems to be the only solution for imports atm.

few issues here:

  1. It does not work with the { [SchemaType.PROTOBUF]: { messageName: 'CustomMessage' }} option. Tries to look for the messageName from every proto definition, should only be looking from the root.

  2. When defintion inherits some type through multiple different files, then it will get an error. Like following example, A inherits messageD from B and C:

A.proto

syntax = "proto3";
package testproto;

import "B.proto";
import "C.proto";

message messageA {
  string id = 1;
  messageB values = 2;
  messageC values = 3;
}

B.proto

syntax = "proto3";
package testproto;

import "D.proto";

message messageB {
  string id = 1;
  messageD meta = 2;
  string value = 3;
}

C.proto

syntax = "proto3";
package testproto;

import "D.proto";

message messageC {
  string id = 1;
  messageD meta = 2;
  repeated messageD values = 2;
}

D.proto

syntax = "proto3";
package testproto;

message messageD {
  string user = 1;
  string time = 2;
}

Error would be something like this: ConfluentSchemaRegistryArgumentError: duplicate name 'messageD' in Namespace testproto The error originates from ProtoSchema file around line this.root.addJSON(copy)

maitsarv avatar Jan 21 '22 08:01 maitsarv