confluent-schema-registry icon indicating copy to clipboard operation
confluent-schema-registry copied to clipboard

protobuf schema doesn't parse protobuf wire protocol correctly.

Open ideasculptor opened this issue 2 years ago • 18 comments

There are a whole slew of problems. First and foremost, it isn't honoring the wire protocol definition - because the docs are structured ridiculously. They document the wire format for avro in a table, making it appear that that is the total of the wire format. But then after the table, there's a paragraph that goes on to describe how protobufs add an extra array of descriptor information to the header before the protobuf data. Look after the green box on this page: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

The protobuf schema implementation completely ignores the array of message descriptor indexes, instead just searching the buffer for the first non-zero byte. But even the first message in a protobuf file can be encoded with the array 1,0 instead of just a 0, implying an array of length 1, and a message index of 0, instead of an array of length 0 and an assumed message index of 0. Note also that the integers in the array, including the length, are variable length zig zag encoded, not just network byte order, so you have to parse them correctly, too, and cannot just assume 4 bytes per integer.

Then there's the problem that things other than messages (Types) can be declared within a .proto file so even if the indexes were being honored in the recursive loop implemented by getNestedTypeName(), the exit condition only checks for Type and Namespace, but it is entirely possible to encounter an Enum or a Service or a Method, so it is necessary to iterate over the keys in parent until you find either a Namespace or a Type, and then continue traversing the descriptor hierarchy from there, rather than always assuming the first key is the one to use.

This is relatively simple to implement, by just iterating over the keys until you find one which is instanceof Type or Namespace.

Finally, the code assumes that there is a parsedMessage.package field, which may be there when parsing a .proto string, but is definitely NOT there when parsing a JSON protobuf descriptor. When the root is created via a JSON descriptor, you have to parse the package name by iterating through the Namespace declarations in the parent hierarchy. It'd be great if the package makes it into the JSON descriptor, but until it does, it is probably safer to determine the package name dynamically rather than looking for it from the parser, since it might go away from parsed protos as easily as it could be added to json descriptors.

if (reflection instanceof Namespace && !(reflection instanceof Type) && reflection.nested)
    return reflection.name + '.' + this.getNestedTypeName(reflection.nested)
return keys[0]

will return the fully qualified name without relying on the package string

I'm going to take a stab at fixing the code in ProtoSchema.ts and submitting a PR to fix all of that (and the corresponding changes in the serializer which will generate the correct array by walking the descriptor hierarchy).

ideasculptor avatar Sep 23 '21 22:09 ideasculptor

it's also worth pointing out that the current implementation will not work at all for any protobuf which references another protobuf via an import statement, since it parses only the schema stored by the registry and not any schema referenced by that schema.

ideasculptor avatar Sep 27 '21 15:09 ideasculptor

+1

it's missing the "message-indexes" part.

image

image

xnog avatar Oct 01 '21 10:10 xnog

I have code which encodes it correctly. It isn't integrated with the schema registry client. I built a separate serializer so that I am using schema registry client only to talk to the registry. Code looks like this:

    private encodePayloadHeader(schemaInfo: SchemaInfo, msgIndexes: number[]): ProtoBuffer {
        let writer = new BufferWriter()
                .uint32(0) // don't zig zag encode. uint32 writes variable number of bytes, so this is 1 byte
                .fixed32(NetworkOrder(schemaInfo.schemaId)) // make big-endian, then write 4 bytes int32
                .sint32(msgIndexes.length) // zig-zag encoded array length

        for (const msgIndex of msgIndexes) {
            writer.sint32(msgIndex) // zig-zag encoded index value
        }
        return writer.finish()
}

I generate the full buffer more or less like this:

        const buf = this.encodePayloadHeader(schemaInfo, msgIndexes)
        const writer = schemaInfo.type.encode(thing)
        return Buffer.concat([buf, writer.finish()])

ideasculptor avatar Oct 01 '21 17:10 ideasculptor

On the decode side, it looks like this:

    private decodeHeader(topic: string, buffer: Buffer): ProtoInfo {
        let bufferReader = Reader.create(buffer)
        const magicByte = bufferReader.uint32()
        const schemaId = HostOrder(bufferReader.fixed32())
        const arrayLen = bufferReader.sint32()
        const msgIndexes = new Array<number>(arrayLen)
        for (let i = 0; i < arrayLen; i++) {
            msgIndexes[i] = bufferReader.sint32()
        }
        return {
            magicByte: magicByte,
            schemaId: schemaId,
            msgIndexes: msgIndexes,
            bytesRead: bufferReader.pos,
        }
    }

    public async deserialize(topic: string, buffer: Buffer): Promise<Message<{}>> {
        if (buffer.length < 6) {
            throw new Error(`buffer with length ${buffer.length} is not long enough to contain a protobuf`)
        }
        const protoInfo = this.decodeHeader(topic, buffer)

        const type = await this.protobufResolver.ResolveProtobuf(topic, protoInfo.schemaId, protoInfo.msgIndexes)
        let bufferReader = Reader.create(buffer)
        bufferReader.skip(protoInfo.bytesRead)
        return type.decode(bufferReader)
    }

protobufResolver uses registry client, topic name, and info parsed from wire protocol to resolve a protobuf Type instance, which is then used to decode the protobuf. That allows me to inject whatever logic I want into the deserializer via protobufResolver for figuring out the type that is encoded in the payload, since correctly computing the message type from message indexes isn't really possible with protobufjs as it is currently implemented. At least not if you also have imported references to other protobufs in your .proto files, since the schema parsed out of the registry won't include the references. By delegating to a resolver, I can resort to quick hacks like hardcoding the type name based on indexes and topic name, for example.

ideasculptor avatar Oct 01 '21 17:10 ideasculptor

I'll probably open-source the serializer and deserializer I built, but it's not likely to happen for a week or two.

ideasculptor avatar Oct 01 '21 17:10 ideasculptor

Hi @ideasculptor , got the same issue. Looking forward for your fixes. Many thanks.

e10101 avatar Nov 21 '21 15:11 e10101

Same problem, any ETA for a fix?

tzahibena-simplex avatar Jan 24 '22 06:01 tzahibena-simplex

Same issue, do we have an ETA for this fix?

MattMakes avatar Apr 12 '22 20:04 MattMakes

I'll probably open-source the serializer and deserializer I built, but it's not likely to happen for a week or two.

@ideasculptor have you had the chance of working on a PR to address this issue? I have recently started using kafkajs/confluent-schema-registry and I can confirm that the library works for JSON and AVRO types, but fails for PROTOBUF.

msempere avatar Jul 27 '22 13:07 msempere

Any update on this?

nicoga97 avatar Jan 27 '23 15:01 nicoga97

@ideasculptor Do you plan to proceed with it?

NikitaKemarskiyLeia avatar Mar 28 '23 18:03 NikitaKemarskiyLeia

Oops, I wrote a comment thinking this was the golang version of the client. @NikitaKemarskiyLeia - it seems unlikely that I'm going to submit an actual PR given how long it has been since I touched this. But the code I pasted above has been working for us very well for quite some time and is cross-compatible with other kafka clients in other languages that do have full protobuf schema registry support, so you've got everything you need to implement your own serialization and deserialization.

Admittedly, copying and pasting those few dozen lines of code is sub-optimal, but having to dig back into this codebase and figure out who to integrate it properly for a PR would be a fair bit of work for me and it is pretty far down my priority queue at the moment, so I wouldn't hold your breath.

ideasculptor avatar Mar 28 '23 19:03 ideasculptor

@ideasculptor Thanks for the response, I'll probably work on this PR by myself then. Your comment was also useful – I have applied your fix to my code (unless the bug is fixed in the library).

NikitaKemarskiyLeia avatar Mar 29 '23 09:03 NikitaKemarskiyLeia

Hi @NikitaKemarskiyLeia , did you got a chance to work on the PR?

Meet-Modi avatar Jun 28 '23 02:06 Meet-Modi

@Meet-Modi Unfortunately have no time for such contribution now :( Using the solution provided by @ideasculptor

NikitaKemarskiyLeia avatar Jun 29 '23 17:06 NikitaKemarskiyLeia

@tulios @Nevon Looks like a lot of people are facing this issue when communicating the data across different services and it is pending for more than 1.5 years. It would be great if same can be picked in next release considering the impact. Thanks in advance.

itsdeekay2 avatar Jul 03 '23 13:07 itsdeekay2

@ideasculptor could you please help with definition of this.protobufResolver.ResolveProtobuf to get the type?

itsdeekay2 avatar Jul 04 '23 06:07 itsdeekay2

@ideasculptor hello, thank you a lot for sharing workaround, or could you please advice how to fetch msg ids, share your codebase

@NikitaKemarskiyLeia might be you can help

mkoiev avatar Apr 23 '24 14:04 mkoiev