confluent-schema-registry
confluent-schema-registry copied to clipboard
Implement support for Protocol Buffer references (imports)
What:
Add support for Protocol Buffer imports and schema references.
I believe this is a feature change without any breaking changes.
Why:
Imports are very useful when writing schema files using Protocol Buffers. And in order to register a schema that uses imports, we need to use Schema Register's references
feature to let it know where to find those imported types. Otherwise, it fails to register the schema.
Previous work:
I believe this relates to #82
There are two other related PRs open in #92 and #121. As far as I can tell, the first extends getSchema
to retrieve references from Schema Registry. The second extends register()
by asking the user to provide the reference schema ids in userOpts
. This PR attempts to extend register()
to automatically register needed references when registering Protocol Buffer schemas, and tries to make getSchema()
create a Protocol Buffer schema instance that includes these references.
How:
When registering a new schema, the register
method now makes use of a new user-provided fetchSchema
helper to fetch definitions of imported schemas, as well as a new referencedSchemas
SchemaHelper that this PR implements for Protocol Buffers, but which could also be implemented for at least JSON schemas.
The fetchSchema
helper tells us how to fetch schema definitions, so we can get the definitions of any imported files. The user of the library decides how to implement this helper, but I guess a common pattern would be to read the proto from a location on disk.
The referencedSchemas
SchemaHelper tells us which other schemas are referenced. This PR implements it for Protos and leaves it returning []
for AVRO and JSON schemas. It could be implemented for other schema formats now or later.
referencedSchemas(schema: string): Promise<string[]>
The register
method is then extended to use these helpers to get the schema references needed, use get their schema definitions, make sure those are all registered recursively and then included as the references
of the newly registered schema.
Schema registry references are a list of [(Subject, Version)]
where the Subject
matches the imported name. For protocol buffers, the Subject is the name of the imported protocolbuffer.
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#referenced-schemas
To get define
and encode
to work, I extended the ProtoSchema constructor so it can take references into account when building the schema instance. It fetches the Schema instances for the references and adds them to the Root context for the new Schema instance, so it has all the types it needs.
constructor(schema: ConfluentSchema, opts?: ProtoOptions, references?: Schema[])
I then extended schemaFromConfluentSchema
to pass through the referenced schema.
Testing
I have added tests to validate behaviour in a few cases:
- Register and encode protocol buffer v3 schema
- Register and encode protocol buffer v3 schema with import. The imported proto has an import itself, to check the recursive registration.
TODO before merging
- Need to check whether it is safe to reuse the argument for
this.root.add()
or whether it assumes ownership over its input argument and mutates it. - Need to check whether we need to explicitly provide "common" imports and whether we can "cheat" and do so via the
protobufjs
library: https://github.com/protobufjs/protobuf.js/blob/master/tests/data/common.proto
Future work
In order to use this new functionality, we should implement support for Message Indexes. Since Protocol Buffers can contain multiple type definitions in a single schema file, each encoded Kafka message includes a Message Index that tells the consumer which Protocol Buffer Message
type to use for parsing the message. I think this feature will either need to be feature flagged, or a breaking change, since it changes the binary wire format.
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
Tested it out, seems to be the only solution for imports atm.
few issues here:
-
It does not work with the
{ [SchemaType.PROTOBUF]: { messageName: 'CustomMessage' }}
option. Tries to look for the messageName from every proto definition, should only be looking from the root. -
When defintion inherits some type through multiple different files, then it will get an error. Like following example, A inherits messageD from B and C:
A.proto
syntax = "proto3";
package testproto;
import "B.proto";
import "C.proto";
message messageA {
string id = 1;
messageB values = 2;
messageC values = 3;
}
B.proto
syntax = "proto3";
package testproto;
import "D.proto";
message messageB {
string id = 1;
messageD meta = 2;
string value = 3;
}
C.proto
syntax = "proto3";
package testproto;
import "D.proto";
message messageC {
string id = 1;
messageD meta = 2;
repeated messageD values = 2;
}
D.proto
syntax = "proto3";
package testproto;
message messageD {
string user = 1;
string time = 2;
}
Error would be something like this: ConfluentSchemaRegistryArgumentError: duplicate name 'messageD' in Namespace testproto
The error originates from ProtoSchema file around line this.root.addJSON(copy)