ProtobufSerializer Error: message type name is empty
Description
When attempting to serialize messages using the ProtobufSerializer an error is always thrown.
Environment Information
- OS Mac M1 Sonoma 14.6.1
- Node Version 20.18.1
- NPM Version 10.8.2:
- confluent-kafka-javascript version 1.0.0:
Steps to Reproduce Execute the below code and it will throw the following error:
Error: message type name is empty
at ProtobufSerializer.serialize (/Users/[email protected]/Develop/other/confluent-kafka-javascript/schemaregistry/serde/protobuf.ts:146:13)
at Object.<anonymous> (/Users/[email protected]/Develop/other/confluent-kafka-javascript/schemaregistry/e2e/schemaregistry-proto.spec.ts:84:31)
at processTicksAndRejections (node:internal/process/task_queues:95:5)
Code to reproduce: a test example has been created in https://github.com/confluentinc/confluent-kafka-javascript/pull/218#issue-2745223459
const schemaString: string = "\n" +
"syntax = \"proto3\";\n" +
"message TestMessageValue {\n" +
" string name = 1;\n" +
" int32 age = 2;\n" +
"\n" +
"}\n";
const messageValue = {
"name": "Bob Jones",
"age": 25
};
const schemaInfo: SchemaInfo = {
schema: schemaString,
schemaType: 'PROTOBUF'
};
const testTopic = "test";
const schemaRegistryClient = new SchemaRegistryClient(clientConfig);
await schemaRegistryClient.register(testTopic + "-value", schemaInfo);
serializerConfig = { useLatestVersion: true };
serializer = new ProtobufSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
deserializer = new ProtobufDeserializer(schemaRegistryClient, SerdeType.VALUE, {});
const outgoingMessage = {
key: 'key',
value: await serializer.serialize(testTopic, messageValue)
};
Additional context When trying to serialize a message using the ProtoSerializer an error is thrown. The expected behavior is that the serialize should be able to successfully serialize messages.
Note: If the above code is changed to include the $typeName then the error changes to the following:
Error: message descriptor not in registry
at ProtobufSerializer.serialize (/Users/[email protected]/Develop/other/confluent-kafka-javascript/schemaregistry/serde/protobuf.ts:150:13)
at Object.<anonymous> (/Users/[email protected]/Develop/other/confluent-kafka-javascript/schemaregistry/e2e/schemaregistry-proto.spec.ts:84:31)
at processTicksAndRejections (node:internal/process/task_queues:95:5)
Code to reproduce:
const outgoingMessage = {
key: 'key',
value: await serializer.serialize(testTopic, { ...messageValue, $typeName: "TestMessageValue"})
};
@baumac , are you using Schema Registry from CP? If so, which version?
@rayokota yup was using the confluentinc/cp-schema-registry:7.7.1 docker image.
I also opened up a PR here that contains a test case that replicates the error https://github.com/confluentinc/confluent-kafka-javascript/pull/218
@baumac, for Protobuf, this library needs changes from https://github.com/confluentinc/schema-registry/pull/3276, which will be in the next CP patch releases.
@rayokota that's great news, is there a ticket or GH issue we can be tracking to know when the Protobuf Serde tooling will be working in the @confluentinc/schemaregistry package?
I'm having the same issue, but I'm using Confluent Cloud so I'm not really certain what version of Schema Registry it's using. Assuming the above-referenced changes are even available there, there are also changes that need to be made here, correct? I would be interested in contributing to fix this since the other JS client is completely broken in this regard and this at least seems more cohesive and close to being complete, and this functionality is crucial to me.
Ok, I'll try to look into this again when I get a chance
Sorry I misunderstand the issue earlier. It still holds true that this needs changes from https://github.com/confluentinc/schema-registry/pull/3276. (These changes are in Confluent Cloud.). Otherwise the example will work with the following:
Types need to be registered directly in the registry as shown here, as follows:
serializer.registry.add(TestMessageSchema)
So the following works (note the $typeName):
serializerConfig = { useLatestVersion: true };
serializer = new ProtobufSerializer(schemaRegistryClient, SerdeType.VALUE, serializerConfig);
serializer.registry.add(TestMessageSchema)
deserializer = new ProtobufDeserializer(schemaRegistryClient, SerdeType.VALUE, {});
const outgoingMessage = {
key: 'key',
value: await serializer.serialize(testTopic, {...messageValue, $typeName: "test.TestMessage"})
};
EDIT: Instead of manually setting $typeName like I showed below, it is much better to just use the create method with the schema generated by @bufbuild. Using the sample code from below, it would look like:
import { create } from '@bufbuild/protobuf';
import { TestSchema } from './gen/Test_pb';
// Sets up the client and serializer/registry
const client = new SchemaRegistryClient();
const serializer = new ProtobufSerializer(schemaRegistryClient, SerdeType.VALUE, {
useLatestVersion: true,
});
serializer.registry.add(TestSchema);
// Serializes a message
const rawMessage = {
meta: {
foo: 'foo',
},
bar: 42,
};
const message = create(TestSchema, rawMessage);
const buf = serializer.serialize('test_topic', message); // Properly encoded
IMO this step of calling create should probably be done by this library, as at that point the schemas have already been added to the library. The only thing left to do is get the topic-schema subject association and then use the correct schema. But for now, this works.
ORIGINAL: Thank you very much for the guidance. With your help and some digging, I was able to piece it together and get everything working.
Some notes for those perhaps coming behind me (please correct me if I'm wrong):
- JS/TS schemas must (as of now) be generated using @bufbuild/protobuf. Their docs are great and really amount to creating a config YAML file and running
npx buf generate. This generates proper Protobuf descriptors and schema representations in JS/TS, as you may be familiar with if you've used Protobuf in other, compiled languages. - Those schemas must then be imported and added to the ProtobufSerializer's registry as shown in the above comment. There is no default export on the generated schema files, but the convention is
MessageName + Schemasomessage Test {becomesTestSchema. $typeNamemust be attached to the object being serialized as shown in the above comment. I'm not 100% but I think they must use the fully-qualified name (e.g.com.foo.bar.MessageTypeNameas opposed to justMessageTypeName).
One last thing I found through a ton of digging:
- If you are using nested message types in your schema, you must also attach
$typeNameto those as well. For example, given:
package test;
message Meta {
string foo = 1;
}
And in another schema:
package test;
import "Meta.proto";
message Test {
test.Meta meta = 1;
int64 bar = 2;
}
The serialized object must look like this, noting the additional $typeName on meta:
{
"$typeName": "test.Test",
"meta": {
"$typeName": "test.Meta",
"foo": "test"
},
"bar": 42
}
If you don't, a ForeignFieldError will be generated.
Using serializer.registry.add worked for me too, but I'm surprised that this isn't documented and forces everybody to use @bufbuild/protobuf.
In my case, that's not an issue as that's what I use anyway, but is this a hack or the official way of using the ProtobufSerializer?